Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ documentation = "https://docs.rs/linux-perf-data/"
repository = "https://github.com/mstange/linux-perf-data/"
exclude = ["/.github", "/.vscode", "/tests"]

[features]
default = ["zstd"]
zstd = ["zstd-safe"]

[dependencies]
byteorder = "1.4.3"
memchr = "2.4.1"
Expand All @@ -21,6 +25,7 @@ linux-perf-event-reader = "0.10.0"
linear-map = "1.2.0"
prost = { version = "0.14", default-features = false, features = ["std"] }
prost-derive = "0.14"
zstd-safe = { version = "7.2", optional = true }

[dev-dependencies]
yaxpeax-arch = { version = "0.3", default-features = false }
Expand Down
1 change: 1 addition & 0 deletions src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub const PERF_RECORD_EVENT_UPDATE: u32 = 78;
pub const PERF_RECORD_TIME_CONV: u32 = 79;
pub const PERF_RECORD_HEADER_FEATURE: u32 = 80;
pub const PERF_RECORD_COMPRESSED: u32 = 81;
pub const PERF_RECORD_COMPRESSED2: u32 = 83;

// pub const SIMPLE_PERF_RECORD_TYPE_START: u32 = 32768;

Expand Down
69 changes: 69 additions & 0 deletions src/decompression.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use zstd_safe::{DCtx, InBuffer, OutBuffer};

/// A zstd decompressor for PERF_RECORD_COMPRESSED records.
pub struct ZstdDecompressor {
dctx: Option<DCtx<'static>>,
/// Buffer for partial perf records that span multiple compressed chunks
partial_record_buffer: Vec<u8>,
}

impl Default for ZstdDecompressor {
fn default() -> Self {
Self::new()
}
}

impl ZstdDecompressor {
pub fn new() -> Self {
Self {
dctx: None,
partial_record_buffer: Vec::new(),
}
}

/// Decompress a chunk of zstd data.
pub fn decompress(&mut self, compressed_data: &[u8]) -> Result<Vec<u8>, std::io::Error> {
let dctx = self.dctx.get_or_insert_with(DCtx::create);

let mut decompressed = vec![0; compressed_data.len() * 4];
let mut in_buffer = InBuffer::around(compressed_data);
let mut total_out = 0;

while in_buffer.pos < in_buffer.src.len() {
let available = decompressed.len() - total_out;
let mut out_buffer = OutBuffer::around(&mut decompressed[total_out..]);

match dctx.decompress_stream(&mut out_buffer, &mut in_buffer) {
Ok(_) => {
total_out += out_buffer.pos();
if out_buffer.pos() == available {
decompressed.resize(decompressed.len() + compressed_data.len() * 4, 0);
}
}
Err(code) => {
let error_name = zstd_safe::get_error_name(code);
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("Zstd decompression failed: {}", error_name),
));
}
}
}

decompressed.truncate(total_out);

// Prepend any partial record data from the previous chunk
if !self.partial_record_buffer.is_empty() {
let mut combined = std::mem::take(&mut self.partial_record_buffer);
combined.extend_from_slice(&decompressed);
decompressed = combined;
}

Ok(decompressed)
}

/// Save partial record data that spans to the next compressed chunk.
pub fn save_partial_record(&mut self, data: &[u8]) {
self.partial_record_buffer = data.to_vec();
}
}
6 changes: 6 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ pub enum Error {

#[error("The specified size in the perf event header was smaller than the header itself")]
InvalidPerfEventSize,

#[error("Cannot parse non-streaming perf.data file with parse_pipe. Use parse_file instead.")]
FileFormatDetectedInPipeMode,

#[error("Detected pipe format in file mode")]
PipeFormatDetectedInFileMode,
}

impl From<std::str::Utf8Error> for Error {
Expand Down
34 changes: 34 additions & 0 deletions src/feature_sections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,40 @@ impl SampleTimeRange {
}
}

/// Information about compression used in the perf.data file.
#[derive(Debug, Clone, Copy)]
pub struct CompressionInfo {
pub version: u32,
/// Compression algorithm type. 1 = Zstd
pub type_: u32,
/// Compression level (e.g., 1-22 for Zstd)
pub level: u32,
/// Compression ratio achieved
pub ratio: u32,
/// mmap buffer size
pub mmap_len: u32,
}

impl CompressionInfo {
pub const STRUCT_SIZE: usize = 4 + 4 + 4 + 4 + 4;
pub const ZSTD_TYPE: u32 = 1;

pub fn parse<R: Read, T: ByteOrder>(mut reader: R) -> Result<Self, std::io::Error> {
let version = reader.read_u32::<T>()?;
let type_ = reader.read_u32::<T>()?;
let level = reader.read_u32::<T>()?;
let ratio = reader.read_u32::<T>()?;
let mmap_len = reader.read_u32::<T>()?;
Ok(Self {
version,
type_,
level,
ratio,
mmap_len,
})
}
}

pub struct HeaderString;

impl HeaderString {
Expand Down
106 changes: 101 additions & 5 deletions src/file_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ use linux_perf_event_reader::{
use std::collections::{HashMap, VecDeque};
use std::io::{Cursor, Read, Seek, SeekFrom};

#[cfg(feature = "zstd")]
use crate::decompression::ZstdDecompressor;

use super::error::{Error, ReadError};
use super::feature_sections::AttributeDescription;
use super::features::Feature;
Expand Down Expand Up @@ -58,7 +61,15 @@ pub struct PerfFileReader<R: Read> {

impl<C: Read + Seek> PerfFileReader<C> {
pub fn parse_file(mut cursor: C) -> Result<Self, Error> {
let header = PerfHeader::parse(&mut cursor)?;
let header = match PerfHeader::parse(&mut cursor) {
Ok(header) => header,
Err(Error::PipeFormatDetectedInFileMode) => {
// Rewind and parse as pipe format instead
cursor.seek(SeekFrom::Start(0))?;
return Self::parse_pipe(cursor);
}
Err(e) => return Err(e),
};
match &header.magic {
b"PERFILE2" => {
Self::parse_file_impl::<LittleEndian>(cursor, header, Endianness::LittleEndian)
Expand Down Expand Up @@ -196,6 +207,8 @@ impl<C: Read + Seek> PerfFileReader<C> {
buffers_for_recycling: VecDeque::new(),
current_event_body: Vec::new(),
pending_first_record: None,
#[cfg(feature = "zstd")]
zstd_decompressor: ZstdDecompressor::new(),
};

Ok(Self {
Expand Down Expand Up @@ -366,6 +379,8 @@ impl<R: Read> PerfFileReader<R> {
buffers_for_recycling: VecDeque::new(),
current_event_body: Vec::new(),
pending_first_record,
#[cfg(feature = "zstd")]
zstd_decompressor: ZstdDecompressor::new(),
};

Ok(Self {
Expand All @@ -391,6 +406,9 @@ pub struct PerfRecordIter<R: Read> {
buffers_for_recycling: VecDeque<Vec<u8>>,
/// For pipe mode: the first non-metadata record that was read during initialization
pending_first_record: Option<(PerfEventHeader, Vec<u8>)>,
/// Zstd decompressor for handling COMPRESSED records
#[cfg(feature = "zstd")]
zstd_decompressor: ZstdDecompressor,
}

impl<R: Read> PerfRecordIter<R> {
Expand Down Expand Up @@ -459,9 +477,9 @@ impl<R: Read> PerfRecordIter<R> {
}
self.read_offset += u64::from(header.size);

if UserRecordType::try_from(RecordType(header.type_))
== Some(UserRecordType::PERF_FINISHED_ROUND)
{
let user_record_type = UserRecordType::try_from(RecordType(header.type_));

if user_record_type == Some(UserRecordType::PERF_FINISHED_ROUND) {
self.sorter.finish_round();
if self.sorter.has_more() {
// The sorter is non-empty. We're done.
Expand All @@ -476,7 +494,6 @@ impl<R: Read> PerfRecordIter<R> {
let event_body_len = size - PerfEventHeader::STRUCT_SIZE;
let mut buffer = self.buffers_for_recycling.pop_front().unwrap_or_default();
buffer.resize(event_body_len, 0);

// Try to read the event body. For pipe mode, EOF here also means end of stream.
match self.reader.read_exact(&mut buffer) {
Ok(()) => {}
Expand All @@ -491,6 +508,28 @@ impl<R: Read> PerfRecordIter<R> {
}
}

if user_record_type == Some(UserRecordType::PERF_COMPRESSED) {
// PERF_COMPRESSED is the old format, not yet implemented
return Err(Error::IoError(std::io::Error::new(
std::io::ErrorKind::Unsupported,
"PERF_COMPRESSED (type 81) is not supported yet, only PERF_COMPRESSED2 (type 83)",
)));
}

if user_record_type == Some(UserRecordType::PERF_COMPRESSED2) {
#[cfg(not(feature = "zstd"))]
{
return Err(Error::IoError(std::io::Error::new(std::io::ErrorKind::Unsupported,
"Compression support is not enabled. Please rebuild with the 'zstd' feature flag.",
)));
}
#[cfg(feature = "zstd")]
{
self.decompress_and_process_compressed2::<T>(&buffer)?;
continue;
}
}

self.process_record::<T>(header, buffer, offset)?;
}

Expand Down Expand Up @@ -542,7 +581,64 @@ impl<R: Read> PerfRecordIter<R> {
attr_index,
};
self.sorter.insert_unordered(sort_key, pending_record);
Ok(())
}

/// Decompresses a PERF_RECORD_COMPRESSED2 record and processes its sub-records.
#[cfg(feature = "zstd")]
fn decompress_and_process_compressed2<T: ByteOrder>(
&mut self,
buffer: &[u8],
) -> Result<(), Error> {
if buffer.len() < 8 {
return Err(ReadError::PerfEventData.into());
}
let data_size = T::read_u64(&buffer[0..8]) as usize;
if data_size > buffer.len() - 8 {
return Err(ReadError::PerfEventData.into());
}
let compressed_data = &buffer[8..8 + data_size];

let decompressed = self.zstd_decompressor.decompress(compressed_data)?;

// Parse the decompressed data as a sequence of perf records
let mut cursor = Cursor::new(&decompressed[..]);
let mut offset = 0u64;

while (cursor.position() as usize) < decompressed.len() {
let header_start = cursor.position() as usize;
// Check if we have enough bytes for a header
let remaining = decompressed.len() - header_start;
if remaining < PerfEventHeader::STRUCT_SIZE {
self.zstd_decompressor
.save_partial_record(&decompressed[header_start..]);
break;
}

let sub_header = PerfEventHeader::parse::<_, T>(&mut cursor)?;
let sub_size = sub_header.size as usize;
if sub_size < PerfEventHeader::STRUCT_SIZE {
return Err(Error::InvalidPerfEventSize);
}

let sub_event_body_len = sub_size - PerfEventHeader::STRUCT_SIZE;
// Check if we have enough bytes for the sub-record body
let remaining_after_header = decompressed.len() - cursor.position() as usize;
if sub_event_body_len > remaining_after_header {
self.zstd_decompressor
.save_partial_record(&decompressed[header_start..]);
break;
}

let mut sub_buffer = self.buffers_for_recycling.pop_front().unwrap_or_default();
sub_buffer.resize(sub_event_body_len, 0);
cursor
.read_exact(&mut sub_buffer)
.map_err(|_| ReadError::PerfEventData)?;

self.process_record::<T>(sub_header, sub_buffer, offset)?;
offset += sub_size as u64;
}
Ok(())
}

Expand Down
24 changes: 18 additions & 6 deletions src/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::io::Read;

use byteorder::{ByteOrder, ReadBytesExt};

use super::error::Error;
use super::features::FeatureSet;
use super::section::PerfFileSection;

Expand All @@ -28,7 +29,7 @@ pub struct PerfHeader {
}

impl PerfHeader {
pub fn parse<R: Read>(mut reader: R) -> Result<Self, std::io::Error> {
pub fn parse<R: Read>(mut reader: R) -> Result<Self, Error> {
let mut magic = [0; 8];
reader.read_exact(&mut magic)?;

Expand All @@ -39,11 +40,14 @@ impl PerfHeader {
}
}

fn parse_impl<R: Read, T: ByteOrder>(
mut reader: R,
magic: [u8; 8],
) -> Result<Self, std::io::Error> {
fn parse_impl<R: Read, T: ByteOrder>(mut reader: R, magic: [u8; 8]) -> Result<Self, Error> {
let header_size = reader.read_u64::<T>()?;

// Detect if this is actually a pipe format instead of file format.
if header_size == std::mem::size_of::<PerfPipeHeader>() as u64 {
return Err(Error::PipeFormatDetectedInFileMode);
}

let attr_size = reader.read_u64::<T>()?;
let attr_section = PerfFileSection::parse::<_, T>(&mut reader)?;
let data_section = PerfFileSection::parse::<_, T>(&mut reader)?;
Expand Down Expand Up @@ -81,7 +85,7 @@ pub struct PerfPipeHeader {
}

impl PerfPipeHeader {
pub fn parse<R: Read>(mut reader: R) -> Result<Self, std::io::Error> {
pub fn parse<R: Read>(mut reader: R) -> Result<Self, Error> {
let mut magic = [0; 8];
reader.read_exact(&mut magic)?;

Expand All @@ -90,6 +94,14 @@ impl PerfPipeHeader {
} else {
reader.read_u64::<byteorder::BigEndian>()?
};

// Detect if this is actually a file format instead of pipe format.
if size > std::mem::size_of::<Self>() as u64
&& size == std::mem::size_of::<PerfHeader>() as u64
{
return Err(Error::FileFormatDetectedInPipeMode);
}

Ok(Self { magic, size })
}
}
4 changes: 3 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@

mod build_id_event;
mod constants;
#[cfg(feature = "zstd")]
mod decompression;
mod dso_info;
mod dso_key;
mod error;
Expand Down Expand Up @@ -91,7 +93,7 @@ pub use linux_perf_event_reader::Endianness;
pub use dso_info::DsoInfo;
pub use dso_key::DsoKey;
pub use error::{Error, ReadError};
pub use feature_sections::{AttributeDescription, NrCpus, SampleTimeRange};
pub use feature_sections::{AttributeDescription, CompressionInfo, NrCpus, SampleTimeRange};
pub use features::{Feature, FeatureSet, FeatureSetIter};
pub use file_reader::{PerfFileReader, PerfRecordIter};
pub use perf_file::PerfFile;
Expand Down
Loading