feat(temporal_reaggregation_processor): Basic aggregations for OTAP payloads#2444
feat(temporal_reaggregation_processor): Basic aggregations for OTAP payloads#2444JakeDern wants to merge 24 commits intoopen-telemetry:mainfrom
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #2444 +/- ##
==========================================
- Coverage 88.32% 88.27% -0.06%
==========================================
Files 603 607 +4
Lines 212222 216568 +4346
==========================================
+ Hits 187444 191165 +3721
- Misses 24252 24877 +625
Partials 526 526
🚀 New features to boost your workflow:
|
...otap-dataflow/crates/core-nodes/src/processors/temporal_reaggregation_processor/aggregate.rs
Show resolved
Hide resolved
...flow/crates/core-nodes/src/processors/temporal_reaggregation_processor/data_point_builder.rs
Show resolved
Hide resolved
| weaver_resolved_schema = { git = "https://github.com/open-telemetry/weaver.git", tag = "v0.21.2"} | ||
| weaver_resolver = { git = "https://github.com/open-telemetry/weaver.git", tag = "v0.21.2"} | ||
| weaver_semconv = { git = "https://github.com/open-telemetry/weaver.git", tag = "v0.21.2"} | ||
| xxhash-rust = { version = "0.8", features = ["xxh3"] } |
There was a problem hiding this comment.
This has a BSL 1.0 license. It seems to be pretty permissive, and I think it would be safe to allow: https://github.com/doumanash/xxhash-rust?tab=BSL-1.0-1-ov-file
If not we can definitely try to find a different library for a 128-bit hash or we can compute two independent 64-bit hashes and combine them which is what the go implementation does.
There was a problem hiding this comment.
To clarify - BSL 1.0 in this case is "Boost 1.0" not "Business Source License"
There was a problem hiding this comment.
@jmacd Any restrictions you know of on Boost 1.0 license? Our internal guidance allows it explicitly; I can add it to the list if there are no other restrictions.
There was a problem hiding this comment.
Yes, I think this is OK, however please also note:
- The issue number [Rust-CI Follow-up] Check with CNCF legal over use of Unicode license #313 is gaining weight. I think we should go ahead and if someone thinks we should change this, address later. think BSL-1.0 is OK, however we have to make a couple of updates.
- The current
NOTICEfile is a little unclear, we might call thisTHIRD_PARTY_NOTICES.txt. We should add the BSL-1.0 requirement for xxhash-rs into this file - The
df_engineI think has to comply with this as well, we need to add a--licensemode that prints the the inlined notice.
There was a problem hiding this comment.
Took care of the first two points in this PR, maybe we can follow up with another PR for adding the --license mode for df_engine if that's ok
lquerel
left a comment
There was a problem hiding this comment.
I think we should revisit how the hashed value is serialized to avoid collisions.
rust/otap-dataflow/crates/core-nodes/src/processors/temporal_reaggregation_processor/mod.rs
Show resolved
Hide resolved
rust/otap-dataflow/crates/core-nodes/src/processors/temporal_reaggregation_processor/config.rs
Show resolved
Hide resolved
...otap-dataflow/crates/core-nodes/src/processors/temporal_reaggregation_processor/aggregate.rs
Show resolved
Hide resolved
| for attr in &buf.entries { | ||
| buf.buf.push(HashTag::Key as u8); | ||
| buf.buf.extend_from_slice(attr.key()); | ||
| write_attr_value(&mut buf.buf, attr); | ||
| } |
There was a problem hiding this comment.
Here we have a collision issue. I'm not talking about a probabilistic collision issue but it's a deterministic alias caused by an ambiguous encoding. With this algo:
{ a = bytes([F4 62 F5]) } and { a = bytes([]), b = empty }
will have the same encoding byte for byte. Adding the len as a prefix for each encoded value will probably fix this issue.
There was a problem hiding this comment.
Yikes, yeah let me think about how to avoid this. We should probably also patch in go because I adapted it from there and it's the same scheme: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/pdatautil/hash.go
There was a problem hiding this comment.
They actually write a suffix as well, which I originally missed, but I don't think it matters. Slightly tweaked collision for the FF suffix:
{ a = bytes([FF F4 62 F6 BB]) } => F4 61 F6 FF F4 62 F6 BB FF
{ a = bytes([]), b = bytes([BB]) } => F4 61 F6 FF F4 62 F6 BB FF
(updated with an easier example)
(updated again, because I forgot a byte, but it doesn't matter)
There was a problem hiding this comment.
I think the length prefix for the byte arrays will work, but I'm also wondering if we should solve this for keys or not. I think if keys are guaranteed to be UTF-8 and we shift the tags into the F5-FF range then we'd be ok there. Otherwise it's trivial to create the same collision:
{ "a\xF6" = bytes([]) } => F4 61 F6 F6 FF
{ "a" = bytes([F6]) } => F4 61 F6 F6 FF
But there's a few layers to consider on the "guaranteed to be UTF-8" part...
-
Our views currently define Strings as
pub type Str<'src> = &'src [u8];. It's reasonable to argue that we therefore can't assume anything about encoding because anyone could implement the trait as narrowly as it's defined. -
Ignoring that for a second let's look at the existing implementation and ask, "can non-utf8 characters make it to this point in the program?"
For arrow we have some guarantees as the source data was parsed already as a string:
#[inline(always)]
pub(crate) fn get_attribute_key<'a>(
attr_key: &'a StringArrayAccessor<'a>,
row_idx: usize,
) -> Option<&'a [u8]> {
attr_key.str_at(row_idx).map(|s| s.as_bytes())
}For OTLP, I'm not sure we have such a guarantee because we're just slicing some byte buffer. You could argue that protobuf defines this field as a string and therefore these bytes must be utf-8, but someone could create a buggy or malicious implementation that does not do this and therefore we need to check before assuming:
fn key(&self) -> otap_df_pdata_views::views::common::Str<'_> {
loop {
if let Some((start, end)) = from_option_nonzero_range_to_primitive(self.key_range.get())
{
return &self.buf[start..end];
} else if self.pos.get() >= self.buf.len() {
break;
} else {
self.advance();
}
}
// return empty string when cannot read key
&[]
}I think my opinion here is that someone could manufacture collisions which may or may not hurt depending on scenario so we should either:
- Redefine the view type as
pub type Str<'src> = &'src str;OR - Put a length field in front of the keys as well
Note: The OpenTelemetry specification only says that attribute names must be valid Unicode but explicitly calls out there's no canonical encoding. This is not really relevant here, but just pointing it out: https://opentelemetry.io/docs/specs/semconv/general/naming/#general-naming-considerations
There was a problem hiding this comment.
Pushing a 4-byte length for both string/byte array keys and values for now
There was a problem hiding this comment.
Go fix here: open-telemetry/opentelemetry-collector-contrib#47242
There was a problem hiding this comment.
.../otap-dataflow/crates/core-nodes/src/processors/temporal_reaggregation_processor/identity.rs
Show resolved
Hide resolved
|
@lquerel Here is a list of things not included in this PR I currently have marked for follow-up in a very approximate order:
|
| for attr in &buf.entries { | ||
| buf.buf.push(HashTag::Key as u8); | ||
| buf.buf.extend_from_slice(attr.key()); | ||
| write_attr_value(&mut buf.buf, attr); | ||
| } |
There was a problem hiding this comment.
| weaver_resolved_schema = { git = "https://github.com/open-telemetry/weaver.git", tag = "v0.21.2"} | ||
| weaver_resolver = { git = "https://github.com/open-telemetry/weaver.git", tag = "v0.21.2"} | ||
| weaver_semconv = { git = "https://github.com/open-telemetry/weaver.git", tag = "v0.21.2"} | ||
| xxhash-rust = { version = "0.8", features = ["xxh3"] } |
There was a problem hiding this comment.
Yes, I think this is OK, however please also note:
- The issue number [Rust-CI Follow-up] Check with CNCF legal over use of Unicode license #313 is gaining weight. I think we should go ahead and if someone thinks we should change this, address later. think BSL-1.0 is OK, however we have to make a couple of updates.
- The current
NOTICEfile is a little unclear, we might call thisTHIRD_PARTY_NOTICES.txt. We should add the BSL-1.0 requirement for xxhash-rs into this file - The
df_engineI think has to comply with this as well, we need to add a--licensemode that prints the the inlined notice.
Change Summary
This is Part 2 of the temporal reaggregation processor which adds some basic aggregation ability and a whole lot of plumbing. There are still many things to fix and implement, but this seemed like a good checkpoint.
This PR lets us:
What issue does this PR close?
How are these changes tested?
Unit.
Are there any user-facing changes?
No.