Conversation
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review infoConfiguration used: Repository UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (1)
✅ Files skipped from review due to trivial changes (1)
WalkthroughAdjusted catalog retention cleanup to log errors instead of propagating them; refactored storage metrics wrapper by replacing a const-generic labels array with explicit provider/method/status fields and updated metric recording. Also updated an assets SHA1 checksum in Cargo.toml. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ❌ 3❌ Failed checks (2 warnings, 1 inconclusive)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/catalog/mod.rs (1)
536-558:⚠️ Potential issue | 🟠 MajorDo not silently succeed when retention cleanup fanout fails.
Line [542] dispatches through
for_each_live_node(which includes queriers inMode::Prism), and Line [558] logs then discards all errors. If any ingestor cleanup fails, this path still returns success after local snapshot mutation, which can leave cluster state inconsistent.Suggested fix direction
- let _ = for_each_live_node(tenant_id, move |ingestor| { + // fan out only to ingestors, then propagate failures + if let Err(err) = for_each_live_node(tenant_id, move |ingestor| { let stream_name = stream_name_clone.clone(); let dates = dates_clone.clone(); async move { let url = format!( "{}{}/logstream/{}/retention/cleanup", ingestor.domain_name, base_path_without_preceding_slash(), stream_name ); handlers::http::cluster::send_retention_cleanup_request(&url, ingestor, &dates) .await?; Ok::<(), ObjectStorageError>(()) } }) - .await - .map_err(|e| tracing::error!("{e}")); + .await + { + tracing::error!(error = ?err, stream = %stream_name_clone, dates = ?dates_clone, "retention cleanup fanout failed"); + return Err(err); + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/catalog/mod.rs` around lines 536 - 558, The fanout to other nodes uses for_each_live_node and currently logs and swallows errors (map_err(|e| tracing::error!("{e}"))), which can hide failed ingestor cleanup; change the logic to (1) only target ingestors (not queriers) when dispatching retention cleanup—filter nodes by role or use an ingestor-specific helper instead of for_each_live_node if available—and (2) propagate any error instead of discarding it: await the future, return Err(...) or use ? so the caller observes failures from handlers::http::cluster::send_retention_cleanup_request; update the closure/return types around send_retention_cleanup_request, for_each_live_node, and the surrounding function so errors bubble up rather than being logged and ignored.
🧹 Nitpick comments (1)
src/storage/metrics_layer.rs (1)
294-299:LIST/LIST_OFFSETmetrics still use a hard-coded success status.Line [298] and Line [315] set
statusto"200", and it is never updated before emission on Line [421]. That makes list failures indistinguishable from successful calls in latency metrics.Possible refactor
-struct StreamMetricWrapper<'a, T> { +struct StreamMetricWrapper<'a> { time: time::Instant, provider: String, method: &'static str, - status: &'static str, - inner: BoxStream<'a, T>, + status: &'static str, + inner: BoxStream<'a, ObjectStoreResult<ObjectMeta>>, } -impl<T> Stream for StreamMetricWrapper<'_, T> { - type Item = T; +impl Stream for StreamMetricWrapper<'_> { + type Item = ObjectStoreResult<ObjectMeta>; fn poll_next( mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>> { match self.inner.poll_next_unpin(cx) { + t @ Poll::Ready(Some(Err(ref err))) => { + STORAGE_REQUEST_RESPONSE_TIME + .with_label_values(&[&self.provider, self.method, error_to_status_code(err)]) + .observe(self.time.elapsed().as_secs_f64()); + t + } t @ Poll::Ready(None) => { STORAGE_REQUEST_RESPONSE_TIME .with_label_values(&[&self.provider, self.method, self.status]) .observe(self.time.elapsed().as_secs_f64()); t } t => t, } } }Also applies to: 311-316, 403-422
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@src/catalog/mod.rs`:
- Around line 536-558: The fanout to other nodes uses for_each_live_node and
currently logs and swallows errors (map_err(|e| tracing::error!("{e}"))), which
can hide failed ingestor cleanup; change the logic to (1) only target ingestors
(not queriers) when dispatching retention cleanup—filter nodes by role or use an
ingestor-specific helper instead of for_each_live_node if available—and (2)
propagate any error instead of discarding it: await the future, return Err(...)
or use ? so the caller observes failures from
handlers::http::cluster::send_retention_cleanup_request; update the
closure/return types around send_retention_cleanup_request, for_each_live_node,
and the surrounding function so errors bubble up rather than being logged and
ignored.
Summary by CodeRabbit