Skip to content

Commit

Permalink
Expand support of Crunchy Bridge system metrics (#480)
Browse files Browse the repository at this point in the history
Use Crunchy Bridge API to obtain the disk usage metrics, which was previously not accurate and incorrectly reporting the disk usage of the container of the collector, not the Postgres server.
Also use the API to obtain some basic information of the cluster, such as region and plan, and report it to pganalyze.
  • Loading branch information
keiko713 authored Nov 30, 2023
1 parent 311fe6f commit 5eede28
Show file tree
Hide file tree
Showing 9 changed files with 950 additions and 432 deletions.
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ type ServerConfig struct {
GcpCredentialsFile string `ini:"gcp_credentials_file"`

CrunchyBridgeClusterID string `ini:"crunchy_bridge_cluster_id"`
CrunchyBridgeAPIKey string `ini:"crunchy_bridge_api_key"`

AivenProjectID string `ini:"aiven_project_id"`
AivenServiceID string `ini:"aiven_service_id"`
Expand Down
6 changes: 6 additions & 0 deletions config/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,12 @@ func getDefaultConfig() *ServerConfig {
if gcpProjectID := os.Getenv("GCP_PROJECT_ID"); gcpProjectID != "" {
config.GcpProjectID = gcpProjectID
}
if crunchyBridgeClusterID := os.Getenv("CRUNCHY_BRIDGE_CLUSTER_ID"); crunchyBridgeClusterID != "" {
config.CrunchyBridgeClusterID = crunchyBridgeClusterID
}
if crunchyBridgeAPIKey := os.Getenv("CRUNCHY_BRIDGE_API_KEY"); crunchyBridgeAPIKey != "" {
config.CrunchyBridgeAPIKey = crunchyBridgeAPIKey
}
if logLocation := os.Getenv("LOG_LOCATION"); logLocation != "" {
config.LogLocation = logLocation
}
Expand Down
263 changes: 263 additions & 0 deletions input/system/crunchy_bridge/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
package crunchy_bridge

import (
"encoding/json"
"fmt"
"io"
"net/http"

"github.com/pganalyze/collector/util"
)

const apiBaseURL = "https://api.crunchybridge.com"

type Client struct {
http.Client

BaseURL string
BearerToken string
ClusterID string
}

type ClusterInfo struct {
CPU int32 `json:"cpu"`
CreatedAt string `json:"created_at"`
Memory float32 `json:"memory"`
Name string `json:"name"`
PlanID string `json:"plan_id"`
ProviderID string `json:"provider_id"`
RegionID string `json:"region_id"`
Storage int32 `json:"storage"`
}

type MetricViews struct {
Name string `json:"name"`
Series []MetricSeries `json:"series"`
}

type MetricSeries struct {
IsEmpty bool `json:"is_empty"`
Name string `json:"name"`
Points []MetricPoint `json:"points"`
Title string `json:"title"`
}

type MetricPoint struct {
Time string `json:"time"`
Value float64 `json:"value"`
}

type CPUMetrics struct {
Iowait float64
System float64
User float64
Steal float64
}

type MemoryMetrics struct {
MemoryUsedPct float64
SwapUsedPct float64
}

type IOPSMetrics struct {
Writes float64
Reads float64
}

type LoadAverageMetrics struct {
One float64
}

type DiskUsageMetrics struct {
DatabaseSize uint64
LogSize uint64
WalSize uint64
}

func (c *Client) NewRequest(method string, path string) (*http.Request, error) {
req, err := http.NewRequest(method, c.BaseURL+path, nil)
if err != nil {
return nil, err
}

req.Header.Set("Authorization", "Bearer "+c.BearerToken)
req.Header.Set("User-Agent", util.CollectorNameAndVersion)
req.Header.Add("Accept", "application/json")
return req, nil
}

func (c *Client) GetClusterInfo() (*ClusterInfo, error) {
req, err := c.NewRequest("GET", "/clusters/"+c.ClusterID)
if err != nil {
return nil, err
}
resp, err := c.Client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

if resp.StatusCode != http.StatusOK || len(body) == 0 {
return nil, err
}

clusterInfo := ClusterInfo{}
err = json.Unmarshal(body, &clusterInfo)
if err != nil {
return nil, err
}
return &clusterInfo, err
}

func (c *Client) getMetrics(name string) (*MetricViews, error) {
req, err := c.NewRequest("GET", fmt.Sprintf("/metric-views/%s?cluster_id=%s&period=15m", name, c.ClusterID))
if err != nil {
return nil, err
}
resp, err := c.Client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

if resp.StatusCode != http.StatusOK || len(body) == 0 {
return nil, err
}

metricViews := MetricViews{}
err = json.Unmarshal(body, &metricViews)
if err != nil {
return nil, err
}

return &metricViews, nil
}

func (c *Client) GetCPUMetrics() (*CPUMetrics, error) {
metricViews, err := c.getMetrics("cpu")
if err != nil {
return nil, err
}

metrics := CPUMetrics{}
for _, series := range metricViews.Series {
switch series.Name {
case "cpu_load_iowait":
metrics.Iowait = average(series.Points)
case "cpu_load_system":
metrics.System = average(series.Points)
case "cpu_load_user":
metrics.User = average(series.Points)
case "cpu_load_steal":
metrics.Steal = average(series.Points)
}
}

return &metrics, err
}

func (c *Client) GetMemoryMetrics() (*MemoryMetrics, error) {
metricViews, err := c.getMetrics("memory")
if err != nil {
return nil, err
}

metrics := MemoryMetrics{}
for _, series := range metricViews.Series {
switch series.Name {
case "memory_used":
metrics.MemoryUsedPct = average(series.Points)
case "swap_used":
metrics.SwapUsedPct = average(series.Points)
}
}

return &metrics, err
}

func (c *Client) GetIOPSMetrics() (*IOPSMetrics, error) {
metricViews, err := c.getMetrics("iops")
if err != nil {
return nil, err
}

metrics := IOPSMetrics{}
for _, series := range metricViews.Series {
switch series.Name {
case "io_wtps":
metrics.Writes = average(series.Points)
case "io_rtps":
metrics.Reads = average(series.Points)
}
}

return &metrics, err
}

func (c *Client) GetLoadAverageMetrics() (*LoadAverageMetrics, error) {
metricViews, err := c.getMetrics("load-average")
if err != nil {
return nil, err
}

metrics := LoadAverageMetrics{}
for _, series := range metricViews.Series {
switch series.Name {
case "load_average_1":
metrics.One = average(series.Points)
}
}

return &metrics, err
}

func (c *Client) GetDiskUsageMetrics() (*DiskUsageMetrics, error) {
metricViews, err := c.getMetrics("disk-usage")
if err != nil {
return nil, err
}

metrics := DiskUsageMetrics{}
for _, series := range metricViews.Series {
switch series.Name {
case "postgres_databases_size_bytes":
metrics.DatabaseSize = uint64(average(series.Points))
case "postgres_log_size_bytes":
metrics.LogSize = uint64(average(series.Points))
case "postgres_wal_size_bytes":
metrics.WalSize = uint64(average(series.Points))
}
}

return &metrics, err
}

func average(points []MetricPoint) float64 {
// With metric-views endpoint, it returns metrics for the last 15 minutes
// The latest data point(s) often returns value 0 as there is some lag within the metrics collection on Crunchy side
// When calculating average, ignore value 0
// Note that this will also ignore the actual 0 value too (e.g. load average),
// though average of such points should be close to zero anyways so ignore them to simplify
var sum float64
var count float64
for _, point := range points {
if point.Value != 0 {
sum += point.Value
count++
}
}
if count == 0 {
return 0
}
return sum / count
}
64 changes: 64 additions & 0 deletions input/system/crunchy_bridge/system.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package crunchy_bridge

import (
"time"

"github.com/pganalyze/collector/config"
"github.com/pganalyze/collector/input/system/selfhosted"
"github.com/pganalyze/collector/state"
"github.com/pganalyze/collector/util"
)

// GetSystemState - Gets system information about a Crunchy Bridge instance
func GetSystemState(config config.ServerConfig, logger *util.Logger) (system state.SystemState) {
// With Crunchy Bridge, we are assuming that the collector is deployed on Container Apps,
// which run directly on the database server. Most of the metrics can be obtained
// using the same way as a self hosted server, since the container receives a bind mount
// of /proc and /sys from the host. Note this excludes disk usage metrics, which we instead
// get from the API.
system = selfhosted.GetSystemState(config, logger)
system.Info.Type = state.CrunchyBridgeSystem

// When API key is provided, use API to obtain extra info including disk usage metrics
if config.CrunchyBridgeAPIKey == "" {
return
}
client := Client{Client: *config.HTTPClientWithRetry, BaseURL: apiBaseURL, BearerToken: config.CrunchyBridgeAPIKey, ClusterID: config.CrunchyBridgeClusterID}

clusterInfo, err := client.GetClusterInfo()
if err != nil {
logger.PrintError("CrunchyBridge/System: Encountered error when getting cluster info %v\n", err)
return
}
system.Info.CrunchyBridge = &state.SystemInfoCrunchyBridge{
ClusterName: clusterInfo.Name,
PlanID: clusterInfo.PlanID,
ProviderID: clusterInfo.ProviderID,
RegionID: clusterInfo.RegionID,
CPUUnits: clusterInfo.CPU,
StorageGB: clusterInfo.Storage,
MemoryGB: clusterInfo.Memory,
}
if parsedCreatedAt, err := time.Parse(time.RFC3339, clusterInfo.CreatedAt); err != nil {
system.Info.CrunchyBridge.CreatedAt = parsedCreatedAt
}

diskUsageMetrics, err := client.GetDiskUsageMetrics()
if err != nil {
logger.PrintError("CrunchyBridge/System: Encountered error when getting cluster disk usage metrics %v\n", err)
return
}

system.DataDirectoryPartition = "/"
system.DiskPartitions = make(state.DiskPartitionMap)
// Manually specify the disk name to "md0" as that's the main disk
system.DiskPartitions["/"] = state.DiskPartition{
DiskName: "md0",
PartitionName: "md0",
UsedBytes: diskUsageMetrics.DatabaseSize,
TotalBytes: uint64(clusterInfo.Storage) * 1024 * 1024 * 1024,
}
system.XlogUsedBytes = diskUsageMetrics.WalSize

return
}
6 changes: 2 additions & 4 deletions input/system/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/pganalyze/collector/config"
"github.com/pganalyze/collector/input/postgres"
"github.com/pganalyze/collector/input/system/crunchy_bridge"
"github.com/pganalyze/collector/input/system/rds"
"github.com/pganalyze/collector/input/system/selfhosted"
"github.com/pganalyze/collector/state"
Expand Down Expand Up @@ -42,10 +43,7 @@ func GetSystemState(config config.ServerConfig, logger *util.Logger, globalColle
} else if config.SystemType == "heroku" {
system.Info.Type = state.HerokuSystem
} else if config.SystemType == "crunchy_bridge" {
// We are assuming container apps are used, which means the collector
// runs on the database server itself and can gather local statistics
system = selfhosted.GetSystemState(config, logger)
system.Info.Type = state.CrunchyBridgeSystem
system = crunchy_bridge.GetSystemState(config, logger)
} else if config.SystemType == "aiven" {
system.Info.Type = state.AivenSystem
} else if dbHost == "" || dbHost == "localhost" || dbHost == "127.0.0.1" || config.AlwaysCollectSystemData {
Expand Down
Loading

0 comments on commit 5eede28

Please sign in to comment.