Replace custom state management with conducer crate#1043
Closed
Replace custom state management with conducer crate#1043
Conversation
Replace the internal Producer/Consumer/Weak state machinery and waiter system with the conducer crate, which provides the same API. Key changes: - Add conducer 0.1 dependency - Remove model/state.rs and model/waiter.rs (~616 lines) - Add From<conducer::Error> for crate::Error conversion - Replace waiter_fn -> conducer::wait - Replace state.abort(err) -> state.close(conducer::Error::Closed) - Simplify TrackConsumer::closed() since conducer only returns Closed/Dropped (both are normal closures) https://claude.ai/code/session_01MdwQB5R1RF7GY3DVqwdxS2
Update to conducer 0.1.1 which removes conducer::Error in favor of Option returns. Each internal State type now has an `abort: Option<Error>` field that stores the error when explicitly aborted. Key changes: - Remove From<conducer::Error> impl (conducer::Error no longer exists) - Add `abort: Option<Error>` to FrameState, GroupState, track::State, broadcast::State - Add `err()` helper on all Producer/Consumer wrappers that borrows state to check abort, falling back to Error::Dropped - Add `Weak<State>` to all Consumer types so they can read the abort error after the channel closes - Convert all modify()/produce()/poll() None returns to the stored abort error via ok_or_else(|| self.err()) - Set state.abort before calling state.close() in all abort methods https://claude.ai/code/session_01MdwQB5R1RF7GY3DVqwdxS2
Replaces `self.state.modify().ok_or_else(|| self.err())?` with `self.modify()?` across FrameProducer, GroupProducer, TrackProducer, BroadcastProducer, and BroadcastDynamic. https://claude.ai/code/session_01MdwQB5R1RF7GY3DVqwdxS2
access() returns ProducerAccess::Mut when open or ProducerAccess::Ref when closed, reading the abort error in a single lock acquisition instead of locking once for modify() and again for borrow() on failure. https://claude.ai/code/session_01MdwQB5R1RF7GY3DVqwdxS2
…ures Instead of storing a Weak<State> on each consumer to read the abort error after channel close, the poll_* functions now return Result so the error propagates naturally through the poll closure. This eliminates the need for a separate err() call and the extra Weak reference. Pattern: .await.ok_or(Error::Dropped)?? — outer ? unwraps the Option (None = dropped without abort), inner ? unwraps the Result (Err = aborted). https://claude.ai/code/session_01MdwQB5R1RF7GY3DVqwdxS2
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This PR replaces the custom
state.rsandwaiter.rsmodules with the externalconducercrate, which provides equivalent producer-consumer state management functionality.Summary
Removed ~600 lines of custom state management code and replaced it with the
conducercrate dependency. This simplifies the codebase by leveraging a dedicated, well-tested library for producer-consumer patterns.Key Changes
state.rsandwaiter.rscontaining customProducer,Consumer,Weak, andWaiterimplementationsconducer = "0.1"inCargo.tomlProducer,Consumer,Weak, andWaiterfromconducercratestate.abort(err)calls withstate.close(conducer::Error::Closed)waiter_fn()withwait()from conducerconducer::Errorand localErrortype via newFromimplementationunused()calls withOk()to handle result type differencesclosed()return values to convertconducer::Errorto localErrortrack.rs,broadcast.rs,frame.rs,group.rs, anderror.rsImplementation Details
From<conducer::Error>implementation inerror.rsto handle conversion between the two error typeshttps://claude.ai/code/session_01MdwQB5R1RF7GY3DVqwdxS2