diff --git a/apps/labrinth/.sqlx/query-0f3d943e4fc48a94363b77c8a7d36eb1dd626e77331d8278c406df952691be4c.json b/apps/labrinth/.sqlx/query-0f3d943e4fc48a94363b77c8a7d36eb1dd626e77331d8278c406df952691be4c.json new file mode 100644 index 0000000000..4bc87e73a5 --- /dev/null +++ b/apps/labrinth/.sqlx/query-0f3d943e4fc48a94363b77c8a7d36eb1dd626e77331d8278c406df952691be4c.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT COUNT(*) FROM payouts_values_notifications WHERE notified = FALSE AND user_id = $1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "count", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + null + ] + }, + "hash": "0f3d943e4fc48a94363b77c8a7d36eb1dd626e77331d8278c406df952691be4c" +} diff --git a/apps/labrinth/.sqlx/query-1adbd24d815107e13bc1440c7a8f4eeff66ab4165a9f4980032e114db4dc1286.json b/apps/labrinth/.sqlx/query-1adbd24d815107e13bc1440c7a8f4eeff66ab4165a9f4980032e114db4dc1286.json new file mode 100644 index 0000000000..921f7f92d9 --- /dev/null +++ b/apps/labrinth/.sqlx/query-1adbd24d815107e13bc1440c7a8f4eeff66ab4165a9f4980032e114db4dc1286.json @@ -0,0 +1,26 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n id,\n status AS \"status: PayoutStatus\"\n FROM payouts\n ORDER BY id\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "status: PayoutStatus", + "type_info": "Varchar" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false, + false + ] + }, + "hash": "1adbd24d815107e13bc1440c7a8f4eeff66ab4165a9f4980032e114db4dc1286" +} diff --git a/apps/labrinth/.sqlx/query-20cff8fdf7971e91c9d473b9a4663ce02ca16781e32232ae0fa7a0af1973d3a4.json b/apps/labrinth/.sqlx/query-20cff8fdf7971e91c9d473b9a4663ce02ca16781e32232ae0fa7a0af1973d3a4.json new file mode 100644 index 0000000000..3c99ff3fed --- /dev/null +++ b/apps/labrinth/.sqlx/query-20cff8fdf7971e91c9d473b9a4663ce02ca16781e32232ae0fa7a0af1973d3a4.json @@ -0,0 +1,20 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT COUNT(*) FROM payouts_values_notifications WHERE notified = FALSE", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "count", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + null + ] + }, + "hash": "20cff8fdf7971e91c9d473b9a4663ce02ca16781e32232ae0fa7a0af1973d3a4" +} diff --git a/apps/labrinth/.sqlx/query-3bf93aa9204bc5c52441e0a3789aeaae55206a459299c2e63cc303d2a7615588.json b/apps/labrinth/.sqlx/query-3bf93aa9204bc5c52441e0a3789aeaae55206a459299c2e63cc303d2a7615588.json new file mode 100644 index 0000000000..c0bc9d10eb --- /dev/null +++ b/apps/labrinth/.sqlx/query-3bf93aa9204bc5c52441e0a3789aeaae55206a459299c2e63cc303d2a7615588.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO delphi_report_issues (report_id, issue_type)\n VALUES ($1, $2)\n ON CONFLICT (report_id, issue_type) DO UPDATE SET\n issue_type = EXCLUDED.issue_type\n RETURNING id\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Int8", + "Text" + ] + }, + "nullable": [ + false + ] + }, + "hash": "3bf93aa9204bc5c52441e0a3789aeaae55206a459299c2e63cc303d2a7615588" +} diff --git a/apps/labrinth/.sqlx/query-6678cd4c51771cfaae2be8021ba66908ea41a06ba858dc5b523aef6aae27b850.json b/apps/labrinth/.sqlx/query-6678cd4c51771cfaae2be8021ba66908ea41a06ba858dc5b523aef6aae27b850.json new file mode 100644 index 0000000000..b4c2e5a56e --- /dev/null +++ b/apps/labrinth/.sqlx/query-6678cd4c51771cfaae2be8021ba66908ea41a06ba858dc5b523aef6aae27b850.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO payouts_values_notifications (date_available, user_id, notified)\n VALUES ($1, $2, FALSE)\n ON CONFLICT (date_available, user_id) DO NOTHING", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Timestamptz", + "Int8" + ] + }, + "nullable": [] + }, + "hash": "6678cd4c51771cfaae2be8021ba66908ea41a06ba858dc5b523aef6aae27b850" +} diff --git a/apps/labrinth/.sqlx/query-69a1cb4b7f1115a990d1fc4805d58541fc78e910111c09ba3d50a12d9ca4a9f8.json b/apps/labrinth/.sqlx/query-69a1cb4b7f1115a990d1fc4805d58541fc78e910111c09ba3d50a12d9ca4a9f8.json new file mode 100644 index 0000000000..fc7d2ac98d --- /dev/null +++ b/apps/labrinth/.sqlx/query-69a1cb4b7f1115a990d1fc4805d58541fc78e910111c09ba3d50a12d9ca4a9f8.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO payouts_values (user_id, mod_id, amount, created, date_available)\n VALUES ($1, NULL, $2, NOW(), $3)", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8", + "Numeric", + "Timestamptz" + ] + }, + "nullable": [] + }, + "hash": "69a1cb4b7f1115a990d1fc4805d58541fc78e910111c09ba3d50a12d9ca4a9f8" +} diff --git a/apps/labrinth/.sqlx/query-9b0edb6399bc5f38ee96677cb2539476ecf08549b914f96f0c23fbfbe3f17f2e.json b/apps/labrinth/.sqlx/query-9b0edb6399bc5f38ee96677cb2539476ecf08549b914f96f0c23fbfbe3f17f2e.json new file mode 100644 index 0000000000..c8aad7deda --- /dev/null +++ b/apps/labrinth/.sqlx/query-9b0edb6399bc5f38ee96677cb2539476ecf08549b914f96f0c23fbfbe3f17f2e.json @@ -0,0 +1,20 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT DISTINCT m.id\n FROM mods m\n WHERE\n EXISTS(\n SELECT 1\n FROM delphi_issue_details_with_statuses didws\n INNER JOIN delphi_report_issues dri ON dri.id = didws.issue_id\n WHERE\n didws.project_id = m.id\n AND didws.status = 'pending'\n -- see delphi.rs todo comment\n AND dri.issue_type != '__dummy'\n )\n AND NOT EXISTS(\n SELECT 1\n FROM delphi_issue_details_with_statuses didws\n INNER JOIN delphi_report_issues dri ON dri.id = didws.issue_id\n WHERE\n didws.project_id = m.id\n AND didws.status IN ('safe', 'unsafe')\n -- see delphi.rs todo comment\n AND dri.issue_type != '__dummy'\n )\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false + ] + }, + "hash": "9b0edb6399bc5f38ee96677cb2539476ecf08549b914f96f0c23fbfbe3f17f2e" +} diff --git a/apps/labrinth/.sqlx/query-b92b5bb7d179c4fcdbc45600ccfd2402f52fea71e27b08e7926fcc2a9e62c0f3.json b/apps/labrinth/.sqlx/query-b92b5bb7d179c4fcdbc45600ccfd2402f52fea71e27b08e7926fcc2a9e62c0f3.json new file mode 100644 index 0000000000..89bd8147dc --- /dev/null +++ b/apps/labrinth/.sqlx/query-b92b5bb7d179c4fcdbc45600ccfd2402f52fea71e27b08e7926fcc2a9e62c0f3.json @@ -0,0 +1,20 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT status AS \"status: PayoutStatus\" FROM payouts WHERE id = 1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "status: PayoutStatus", + "type_info": "Varchar" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false + ] + }, + "hash": "b92b5bb7d179c4fcdbc45600ccfd2402f52fea71e27b08e7926fcc2a9e62c0f3" +} diff --git a/apps/labrinth/.sqlx/query-cd5ccd618fb3cc41646a6de86f9afedb074492b4ec7f2457c14113f5fd13aa02.json b/apps/labrinth/.sqlx/query-cd5ccd618fb3cc41646a6de86f9afedb074492b4ec7f2457c14113f5fd13aa02.json new file mode 100644 index 0000000000..469c30168a --- /dev/null +++ b/apps/labrinth/.sqlx/query-cd5ccd618fb3cc41646a6de86f9afedb074492b4ec7f2457c14113f5fd13aa02.json @@ -0,0 +1,18 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO payouts (id, method, platform_id, status, user_id, amount, created)\n VALUES ($1, $2, $3, $4, $5, 10.0, NOW())\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8", + "Text", + "Text", + "Varchar", + "Int8" + ] + }, + "nullable": [] + }, + "hash": "cd5ccd618fb3cc41646a6de86f9afedb074492b4ec7f2457c14113f5fd13aa02" +} diff --git a/apps/labrinth/.sqlx/query-cec4240c7c848988b3dfd13e3f8e5c93783c7641b019fdb698a1ec0be1393606.json b/apps/labrinth/.sqlx/query-cec4240c7c848988b3dfd13e3f8e5c93783c7641b019fdb698a1ec0be1393606.json new file mode 100644 index 0000000000..52e020ebf2 --- /dev/null +++ b/apps/labrinth/.sqlx/query-cec4240c7c848988b3dfd13e3f8e5c93783c7641b019fdb698a1ec0be1393606.json @@ -0,0 +1,17 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO payouts (id, method, platform_id, status, user_id, amount, created)\n VALUES ($1, $2, NULL, $3, $4, 10.00, NOW())\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8", + "Text", + "Varchar", + "Int8" + ] + }, + "nullable": [] + }, + "hash": "cec4240c7c848988b3dfd13e3f8e5c93783c7641b019fdb698a1ec0be1393606" +} diff --git a/apps/labrinth/.sqlx/query-d8566db50fd8e8c31419f1ad596d3255af3f759d9d202b8a2cec59701c8b3ba6.json b/apps/labrinth/.sqlx/query-d8566db50fd8e8c31419f1ad596d3255af3f759d9d202b8a2cec59701c8b3ba6.json new file mode 100644 index 0000000000..9709e75e48 --- /dev/null +++ b/apps/labrinth/.sqlx/query-d8566db50fd8e8c31419f1ad596d3255af3f759d9d202b8a2cec59701c8b3ba6.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT DISTINCT dr.file_id\n FROM delphi_reports dr\n INNER JOIN files f ON f.id = dr.file_id\n INNER JOIN versions v ON v.id = f.version_id\n WHERE v.mod_id = ANY($1::bigint[])\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "file_id", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Int8Array" + ] + }, + "nullable": [ + true + ] + }, + "hash": "d8566db50fd8e8c31419f1ad596d3255af3f759d9d202b8a2cec59701c8b3ba6" +} diff --git a/apps/labrinth/.sqlx/query-fd5c773a61d35bcd71503ec4d5f86e8917cfab9679d5064074681663ba467e41.json b/apps/labrinth/.sqlx/query-fd5c773a61d35bcd71503ec4d5f86e8917cfab9679d5064074681663ba467e41.json new file mode 100644 index 0000000000..d3e3520bcc --- /dev/null +++ b/apps/labrinth/.sqlx/query-fd5c773a61d35bcd71503ec4d5f86e8917cfab9679d5064074681663ba467e41.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT COUNT(*) FROM notifications WHERE user_id = $1 AND body->>'type' = 'payout_available'", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "count", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + null + ] + }, + "hash": "fd5c773a61d35bcd71503ec4d5f86e8917cfab9679d5064074681663ba467e41" +} diff --git a/apps/labrinth/src/database/models/delphi_report_item.rs b/apps/labrinth/src/database/models/delphi_report_item.rs index a88439d670..e975145ed9 100644 --- a/apps/labrinth/src/database/models/delphi_report_item.rs +++ b/apps/labrinth/src/database/models/delphi_report_item.rs @@ -182,6 +182,27 @@ pub struct DelphiReportIssueResult { } impl DBDelphiReportIssue { + pub async fn upsert( + &self, + transaction: &mut PgTransaction<'_>, + ) -> Result { + Ok(DelphiReportIssueId( + sqlx::query_scalar!( + " + INSERT INTO delphi_report_issues (report_id, issue_type) + VALUES ($1, $2) + ON CONFLICT (report_id, issue_type) DO UPDATE SET + issue_type = EXCLUDED.issue_type + RETURNING id + ", + self.report_id as DelphiReportId, + self.issue_type, + ) + .fetch_one(&mut *transaction) + .await?, + )) + } + pub async fn insert( &self, transaction: &mut PgTransaction<'_>, diff --git a/apps/labrinth/src/lib.rs b/apps/labrinth/src/lib.rs index 207bdf423a..46ac812904 100644 --- a/apps/labrinth/src/lib.rs +++ b/apps/labrinth/src/lib.rs @@ -19,6 +19,7 @@ use crate::database::{PgPool, ReadOnlyPgPool}; use crate::env::ENV; use crate::queue::billing::{index_billing, index_subscriptions}; use crate::queue::moderation::AutomatedModerationQueue; +use crate::routes::internal::delphi::rescan::rescan_projects_in_queue; use crate::util::anrok; use crate::util::archon::ArchonClient; use crate::util::ratelimit::{AsyncRateLimiter, GCRAParameters}; @@ -102,6 +103,15 @@ pub fn app_setup( let scheduler = scheduler::Scheduler::new(); + { + let pool_ref = pool.clone(); + actix_rt::spawn(async move { + if let Err(err) = rescan_projects_in_queue(&pool_ref).await { + warn!("Delphi rescan failed: {err:#}"); + } + }); + } + let limiter = web::Data::new(AsyncRateLimiter::new( redis_pool.clone(), GCRAParameters::new(300, 300), diff --git a/apps/labrinth/src/routes/internal/delphi.rs b/apps/labrinth/src/routes/internal/delphi/mod.rs similarity index 94% rename from apps/labrinth/src/routes/internal/delphi.rs rename to apps/labrinth/src/routes/internal/delphi/mod.rs index 20c051ca60..dc5772bc76 100644 --- a/apps/labrinth/src/routes/internal/delphi.rs +++ b/apps/labrinth/src/routes/internal/delphi/mod.rs @@ -34,6 +34,8 @@ use crate::{ util::{error::Context, guards::admin_key_guard}, }; +pub mod rescan; + pub fn config(cfg: &mut web::ServiceConfig) { cfg.service( web::scope("delphi") @@ -197,7 +199,10 @@ async fn ingest_report_deserialized( report.send_to_slack(&pool, &redis).await.ok(); - let mut transaction = pool.begin().await?; + let mut transaction = pool + .begin() + .await + .wrap_internal_err("failed to begin Delphi ingest transaction")?; let report_id = DBDelphiReport { id: DelphiReportId(0), // This will be set by the database @@ -208,7 +213,8 @@ async fn ingest_report_deserialized( severity: report.severity, } .upsert(&mut transaction) - .await?; + .await + .wrap_internal_err("failed to upsert Delphi report")?; info!( num_issues = %report.issues.len(), @@ -292,8 +298,9 @@ async fn ingest_report_deserialized( report_id, issue_type: "__dummy".into(), } - .insert(&mut transaction) - .await?; + .upsert(&mut transaction) + .await + .wrap_internal_err("failed to upsert dummy Delphi report issue")?; ReportIssueDetail { id: DelphiReportIssueDetailsId(0), // This will be set by the database @@ -307,7 +314,10 @@ async fn ingest_report_deserialized( status: DelphiStatus::Pending, } .insert(&mut transaction) - .await?; + .await + .wrap_internal_err( + "failed to insert dummy Delphi report issue detail", + )?; } for (issue_type, issue_details) in report.issues { @@ -316,12 +326,14 @@ async fn ingest_report_deserialized( report_id, issue_type, } - .insert(&mut transaction) - .await?; + .upsert(&mut transaction) + .await + .wrap_internal_err("failed to upsert Delphi report issue")?; // This is required to handle the case where the same Delphi version is re-run on the same file ReportIssueDetail::remove_all_by_issue_id(issue_id, &mut transaction) - .await?; + .await + .wrap_internal_err("failed to remove old Delphi issue details")?; for issue_detail in issue_details { let decompiled_source = @@ -339,11 +351,15 @@ async fn ingest_report_deserialized( status: DelphiStatus::Pending, } .insert(&mut transaction) - .await?; + .await + .wrap_internal_err("failed to insert Delphi issue detail")?; } } - transaction.commit().await?; + transaction + .commit() + .await + .wrap_internal_err("failed to commit Delphi ingest transaction")?; Ok(()) } diff --git a/apps/labrinth/src/routes/internal/delphi/rescan.rs b/apps/labrinth/src/routes/internal/delphi/rescan.rs new file mode 100644 index 0000000000..197369c527 --- /dev/null +++ b/apps/labrinth/src/routes/internal/delphi/rescan.rs @@ -0,0 +1,144 @@ +use eyre::{Result, WrapErr, eyre}; +use futures::future::try_join_all; +use tracing::info; + +use super::{DELPHI_CLIENT, DelphiRunParameters}; +use crate::{database::PgPool, env::ENV, models::ids::FileId}; + +pub async fn rescan_projects_in_queue(pool: &PgPool) -> Result<()> { + let delphi_version = fetch_delphi_version().await?; + let old_delphi_version = fetch_stored_delphi_version(pool).await?; + + if old_delphi_version == Some(delphi_version) { + info!( + ?delphi_version, + "Delphi version unchanged; skipping startup tech review rescan" + ); + return Ok(()); + } + + info!( + ?old_delphi_version, + ?delphi_version, + delphi_version, + "Delphi version changed; rescanning tech review queue" + ); + + let project_ids = fetch_unreviewed_tech_review_project_ids(pool).await?; + if project_ids.is_empty() { + info!("No fully unreviewed tech review projects found to rescan"); + return Ok(()); + } + + let file_ids = fetch_project_file_ids(pool, &project_ids).await?; + if file_ids.is_empty() { + info!( + project_count = project_ids.len(), + "No files found for tech review projects selected for rescan" + ); + return Ok(()); + } + + let file_ids = file_ids + .into_iter() + .map(|file_id| FileId(file_id.cast_unsigned())); + + try_join_all(file_ids.map(|file_id| async move { + super::run(pool, DelphiRunParameters { file_id }) + .await + .wrap_err_with(|| { + eyre!("failed to submit Delphi rescan for `{file_id:?}`") + }) + })) + .await?; + + info!( + project_count = project_ids.len(), + "Submitted Delphi rescans for all unreviewed tech review project files" + ); + + Ok(()) +} + +async fn fetch_delphi_version() -> Result { + let response = DELPHI_CLIENT + .get(format!("{}/version", ENV.DELPHI_URL)) + .send() + .await + .and_then(|res| res.error_for_status()) + .wrap_err("failed to fetch Delphi version")?; + + let version = response + .text() + .await + .wrap_err("failed to read Delphi version response body")?; + let version = version.trim().parse::().wrap_err_with(|| { + eyre!("invalid Delphi version response body: {version}") + })?; + Ok(version) +} + +async fn fetch_stored_delphi_version(pool: &PgPool) -> Result> { + let row = + sqlx::query_scalar!("SELECT MAX(delphi_version) FROM delphi_reports") + .fetch_one(pool) + .await + .wrap_err("failed to fetch latest stored Delphi version")?; + Ok(row) +} + +async fn fetch_unreviewed_tech_review_project_ids( + pool: &PgPool, +) -> Result> { + sqlx::query_scalar!( + r#" + SELECT DISTINCT m.id + FROM mods m + WHERE + EXISTS( + SELECT 1 + FROM delphi_issue_details_with_statuses didws + INNER JOIN delphi_report_issues dri ON dri.id = didws.issue_id + WHERE + didws.project_id = m.id + AND didws.status = 'pending' + -- see delphi.rs todo comment + AND dri.issue_type != '__dummy' + ) + AND NOT EXISTS( + SELECT 1 + FROM delphi_issue_details_with_statuses didws + INNER JOIN delphi_report_issues dri ON dri.id = didws.issue_id + WHERE + didws.project_id = m.id + AND didws.status IN ('safe', 'unsafe') + -- see delphi.rs todo comment + AND dri.issue_type != '__dummy' + ) + "#, + ) + .fetch_all(pool) + .await + .wrap_err("failed to fetch fully unreviewed tech review project ids") +} + +async fn fetch_project_file_ids( + pool: &PgPool, + project_ids: &[i64], +) -> Result> { + let rows = sqlx::query_scalar!( + r#" + SELECT DISTINCT dr.file_id + FROM delphi_reports dr + INNER JOIN files f ON f.id = dr.file_id + INNER JOIN versions v ON v.id = f.version_id + WHERE v.mod_id = ANY($1::bigint[]) + "#, + project_ids, + ) + .fetch_all(pool) + .await + .wrap_err("failed to fetch file ids for tech review Delphi rescan")?; + + Ok(rows.into_iter().flatten().collect()) +}