Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[crashtracker] Refactor receiver code into seperate files. #818

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 137 additions & 0 deletions crashtracker/src/receiver/entry_points.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use super::receive_report::{receive_report_from_stream, CrashReportStatus};
use crate::{CrashInfo, CrashtrackerConfiguration, StacktraceCollection};
use anyhow::Context;
use std::time::Duration;
use tokio::{
io::{AsyncBufReadExt, BufReader},
net::UnixListener,
};

/*-----------------------------------------
| Public API |
------------------------------------------*/

pub fn receiver_entry_point_stdin() -> anyhow::Result<()> {
let stream = BufReader::new(tokio::io::stdin());
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
rt.block_on(receiver_entry_point(receiver_timeout(), stream))?;
Ok(())
}

pub async fn async_receiver_entry_point_unix_socket(
socket_path: impl AsRef<str>,
one_shot: bool,
) -> anyhow::Result<()> {
let listener = get_unix_socket(socket_path)?;
loop {
let (unix_stream, _) = listener.accept().await?;
let stream = BufReader::new(unix_stream);
let res = receiver_entry_point(receiver_timeout(), stream).await;

if one_shot {
return res;
}
}
}

pub fn receiver_entry_point_unix_socket(socket_path: impl AsRef<str>) -> anyhow::Result<()> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
rt.block_on(async_receiver_entry_point_unix_socket(socket_path, true))?;
Ok(())
// Dropping the stream closes it, allowing the collector to exit if it was waiting.
}

/*-----------------------------------------
| Helper Functions |
------------------------------------------*/

fn get_unix_socket(socket_path: impl AsRef<str>) -> anyhow::Result<UnixListener> {
fn path_bind(socket_path: impl AsRef<str>) -> anyhow::Result<UnixListener> {
let socket_path = socket_path.as_ref();
if std::fs::metadata(socket_path).is_ok() {
std::fs::remove_file(socket_path).with_context(|| {
format!("could not delete previous socket at {:?}", socket_path)
})?;
}
Ok(UnixListener::bind(socket_path)?)
}

#[cfg(target_os = "linux")]
let unix_listener = if socket_path.as_ref().starts_with(['.', '/']) {
path_bind(socket_path)
} else {
use std::os::linux::net::SocketAddrExt;
std::os::unix::net::SocketAddr::from_abstract_name(socket_path.as_ref())
.and_then(|addr| {
std::os::unix::net::UnixListener::bind_addr(&addr)
.and_then(|listener| {
listener.set_nonblocking(true)?;
Ok(listener)
})
.and_then(UnixListener::from_std)
})
.map_err(anyhow::Error::msg)
};
#[cfg(not(target_os = "linux"))]
let unix_listener = path_bind(socket_path);
unix_listener.context("Could not create the unix socket")
}

/// Receives data from a crash collector via a stream, formats it into
/// `CrashInfo` json, and emits it to the endpoint/file defined in `config`.
///
/// At a high-level, this exists because doing anything in a
/// signal handler is dangerous, so we fork a sidecar to do the stuff we aren't
/// allowed to do in the handler.
///
/// See comments in [crashtracker/lib.rs] for a full architecture
/// description.
async fn receiver_entry_point(
timeout: Duration,
stream: impl AsyncBufReadExt + std::marker::Unpin,
) -> anyhow::Result<()> {
match receive_report_from_stream(timeout, stream).await? {
CrashReportStatus::NoCrash => Ok(()),
CrashReportStatus::CrashReport(config, mut crash_info) => {
resolve_frames(&config, &mut crash_info)?;
crash_info.async_upload_to_endpoint(&config.endpoint).await
}
CrashReportStatus::PartialCrashReport(config, mut crash_info, stdin_state) => {
eprintln!("Failed to fully receive crash. Exit state was: {stdin_state:?}");
resolve_frames(&config, &mut crash_info)?;
crash_info.async_upload_to_endpoint(&config.endpoint).await
}
}
}

fn receiver_timeout() -> Duration {
// https://github.com/DataDog/libdatadog/issues/717
if let Ok(s) = std::env::var("DD_CRASHTRACKER_RECEIVER_TIMEOUT_MS") {
if let Ok(v) = s.parse() {
return Duration::from_millis(v);
}
}
// Default value
Duration::from_millis(4000)
}

fn resolve_frames(
config: &CrashtrackerConfiguration,
crash_info: &mut CrashInfo,
) -> anyhow::Result<()> {
if config.resolve_frames == StacktraceCollection::EnabledWithSymbolsInReceiver {
let proc_info = crash_info
.proc_info
.as_ref()
.context("Unable to resolve frames: No PID specified")?;
crash_info.resolve_names_from_process(proc_info.pid)?;
}
Ok(())
}
102 changes: 102 additions & 0 deletions crashtracker/src/receiver/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0
#![cfg(unix)]

mod entry_points;
pub use entry_points::{
async_receiver_entry_point_unix_socket, receiver_entry_point_stdin,
receiver_entry_point_unix_socket,
};
mod receive_report;

#[cfg(test)]
mod tests {
use super::receive_report::*;
use crate::shared::constants::*;
use crate::{CrashtrackerConfiguration, SigInfo, StacktraceCollection};
use std::time::Duration;
use tokio::io::{AsyncWriteExt, BufReader};
use tokio::net::UnixStream;

async fn to_socket(
target: &mut tokio::net::UnixStream,
msg: impl AsRef<str>,
) -> anyhow::Result<usize> {
let msg = msg.as_ref();
let n = target.write(format!("{msg}\n").as_bytes()).await?;
target.flush().await?;
Ok(n)
}

async fn send_report(delay: Duration, mut stream: UnixStream) -> anyhow::Result<()> {
let sender = &mut stream;
to_socket(sender, DD_CRASHTRACK_BEGIN_SIGINFO).await?;
to_socket(
sender,
serde_json::to_string(&SigInfo {
signame: Some("SIGSEGV".to_string()),
signum: 11,
faulting_address: None,
})?,
)
.await?;
to_socket(sender, DD_CRASHTRACK_END_SIGINFO).await?;

to_socket(sender, DD_CRASHTRACK_BEGIN_CONFIG).await?;
to_socket(
sender,
serde_json::to_string(&CrashtrackerConfiguration::new(
vec![],
false,
false,
None,
StacktraceCollection::Disabled,
3000,
None,
)?)?,
)
.await?;
to_socket(sender, DD_CRASHTRACK_END_CONFIG).await?;
tokio::time::sleep(delay).await;
to_socket(sender, DD_CRASHTRACK_DONE).await?;
Ok(())
}

#[tokio::test]
#[cfg_attr(miri, ignore)]
async fn test_receive_report_short_timeout() -> anyhow::Result<()> {
let (sender, receiver) = tokio::net::UnixStream::pair()?;

let join_handle1 = tokio::spawn(receive_report_from_stream(
Duration::from_secs(1),
BufReader::new(receiver),
));
let join_handle2 = tokio::spawn(send_report(Duration::from_secs(2), sender));

let crash_report = join_handle1.await??;
assert!(matches!(
crash_report,
CrashReportStatus::PartialCrashReport(_, _, _)
));
let sender_error = join_handle2.await?.unwrap_err().to_string();
assert_eq!(sender_error, "Broken pipe (os error 32)");
Ok(())
}

#[tokio::test]
#[cfg_attr(miri, ignore)]
async fn test_receive_report_long_timeout() -> anyhow::Result<()> {
let (sender, receiver) = tokio::net::UnixStream::pair()?;

let join_handle1 = tokio::spawn(receive_report_from_stream(
Duration::from_secs(2),
BufReader::new(receiver),
));
let join_handle2 = tokio::spawn(send_report(Duration::from_secs(1), sender));

let crash_report = join_handle1.await??;
assert!(matches!(crash_report, CrashReportStatus::CrashReport(_, _)));
join_handle2.await??;
Ok(())
}
}
Loading
Loading