From ac304d437ce2d86d775e725965575c9b357e34c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mat=C3=ADas=20Insaurralde?= Date: Wed, 25 Feb 2026 21:26:00 -0300 Subject: [PATCH] fix(artifact-cas): return ResourceExhausted for oversized uploads MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Matías Insaurralde --- .../internal/service/bytestream.go | 16 ++++++---- .../internal/service/bytestream_test.go | 30 +++++++++++++++++-- pkg/blobmanager/errors.go | 19 +++++++++++- 3 files changed, 57 insertions(+), 8 deletions(-) diff --git a/app/artifact-cas/internal/service/bytestream.go b/app/artifact-cas/internal/service/bytestream.go index ea6884821..748fe898a 100644 --- a/app/artifact-cas/internal/service/bytestream.go +++ b/app/artifact-cas/internal/service/bytestream.go @@ -1,5 +1,5 @@ // -// Copyright 2024 The Chainloop Authors. +// Copyright 2024-2026 The Chainloop Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -75,7 +75,7 @@ func (s *ByteStreamService) Write(stream bytestream.ByteStream_WriteServer) erro return kerrors.BadRequest("resource name", err.Error()) } - backend, err := s.loadBackend(ctx, info.BackendType, info.StoredSecretID) + storageBackend, err := s.loadBackend(ctx, info.BackendType, info.StoredSecretID) if err != nil && kerrors.IsNotFound(err) { return err } else if err != nil { @@ -83,7 +83,7 @@ func (s *ByteStreamService) Write(stream bytestream.ByteStream_WriteServer) erro } // We check if the file already exists even before we wait for the whole buffer to be filled - if exists, err := backend.Exists(ctx, req.resource.Digest); err != nil { + if exists, err := storageBackend.Exists(ctx, req.resource.Digest); err != nil { return sl.LogAndMaskErr(err, s.log) } else if exists { s.log.Infow("msg", "artifact already exists", "digest", req.resource.Digest) @@ -95,6 +95,9 @@ func (s *ByteStreamService) Write(stream bytestream.ByteStream_WriteServer) erro buffer := newStreamReader(info.MaxBytes) // Add data from first request if err = buffer.Write(req.GetData()); err != nil { + if backend.IsUploadSizeExceeded(err) { + return status.Error(codes.ResourceExhausted, err.Error()) + } return sl.LogAndMaskErr(err, s.log) } @@ -115,11 +118,14 @@ func (s *ByteStreamService) Write(stream bytestream.ByteStream_WriteServer) erro s.log.Infow("msg", "upload canceled", "digest", req.resource.Digest, "name", req.resource.FileName) return nil } + if backend.IsUploadSizeExceeded(err) { + return status.Error(codes.ResourceExhausted, err.Error()) + } return sl.LogAndMaskErr(err, s.log) } s.log.Infow("msg", "artifact received, uploading now to backend", "name", req.resource.FileName, "digest", req.resource.Digest, "size", buffer.size) - if err := backend.Upload(ctx, buffer, req.resource); err != nil { + if err := storageBackend.Upload(ctx, buffer, req.resource); err != nil { return sl.LogAndMaskErr(err, s.log) } @@ -245,7 +251,7 @@ func (r *streamReader) Write(data []byte) error { // Check if the size of the buffer has exceeded the maximum allowed size // if maxSize is 0, then there is no limit if r.maxSize != 0 && r.size > r.maxSize { - return fmt.Errorf("max size of upload exceeded: want=%d, max=%d", r.size, r.maxSize) + return backend.NewErrUploadSizeExceeded(r.size, r.maxSize) } _, err := r.Buffer.Write(data) diff --git a/app/artifact-cas/internal/service/bytestream_test.go b/app/artifact-cas/internal/service/bytestream_test.go index 0d095b553..518fb5620 100644 --- a/app/artifact-cas/internal/service/bytestream_test.go +++ b/app/artifact-cas/internal/service/bytestream_test.go @@ -1,5 +1,5 @@ // -// Copyright 2024 The Chainloop Authors. +// Copyright 2024-2026 The Chainloop Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -23,6 +23,7 @@ import ( "errors" "io" "net" + "strconv" "testing" v1 "github.com/chainloop-dev/chainloop/app/artifact-cas/api/cas/v1" @@ -77,6 +78,7 @@ func (s *bytestreamSuite) TestStreamReaderOverflow() { s.Equal(int64(5), buffer.size) err = buffer.Write([]byte("chainloop")) s.Error(err) + s.True(backend.IsUploadSizeExceeded(err)) s.ErrorContains(err, "max size of upload exceeded") } @@ -119,6 +121,25 @@ func (s *bytestreamSuite) TestWrite() { _, err = stream.CloseAndRecv() assertGRPCError(t, err, codes.InvalidArgument, "resourceName must be set") }) + + s.T().Run("max size exceeded", func(t *testing.T) { + ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs("role", "uploader", "max-bytes", "8")) + s.ociBackend.On("Exists", mock.Anything, s.resource.Digest).Return(false, nil) + + stream, err := s.client.Write(ctx) + s.NoError(err) + s.NoError(stream.Send(&bytestream.WriteRequest{ + ResourceName: encodeResource(s.T(), s.resource), + Data: []byte("hello"), + })) + s.NoError(stream.Send(&bytestream.WriteRequest{ + ResourceName: encodeResource(s.T(), s.resource), + Data: []byte("chainloop"), + })) + + _, err = stream.CloseAndRecv() + assertGRPCError(t, err, codes.ResourceExhausted, "max size of upload exceeded") + }) } // NOTE: separated test cases for each error case to make sure the context and stubs are re-set @@ -171,7 +192,7 @@ func (s *bytestreamSuite) TestWriteErrorUploading() { })) _, err = stream.CloseAndRecv() - s.Error(err) + assertGRPCError(s.T(), err, codes.Internal, "") } func (s *bytestreamSuite) TestRead() { @@ -298,6 +319,11 @@ func (s *bytestreamSuite) SetupTest() { claims := &casJWT.Claims{ StoredSecretID: "secret-id", BackendType: backendType, } + if maxBytes := md.Get("max-bytes"); len(maxBytes) > 0 { + parsedMaxBytes, err := strconv.ParseInt(maxBytes[0], 10, 64) + require.NoError(s.T(), err) + claims.MaxBytes = parsedMaxBytes + } if roles := md.Get("role"); len(roles) > 0 { if roles[0] == "downloader" { diff --git a/pkg/blobmanager/errors.go b/pkg/blobmanager/errors.go index 540234015..803f6cff1 100644 --- a/pkg/blobmanager/errors.go +++ b/pkg/blobmanager/errors.go @@ -1,5 +1,5 @@ // -// Copyright 2023 The Chainloop Authors. +// Copyright 2023-2026 The Chainloop Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -35,3 +35,20 @@ func (e ErrNotFound) Error() string { func IsNotFound(err error) bool { return errors.As(err, &ErrNotFound{}) } + +type ErrUploadSizeExceeded struct { + want int64 + max int64 +} + +func NewErrUploadSizeExceeded(want, maxSize int64) ErrUploadSizeExceeded { + return ErrUploadSizeExceeded{want: want, max: maxSize} +} + +func (e ErrUploadSizeExceeded) Error() string { + return fmt.Sprintf("max size of upload exceeded: want=%d, max=%d", e.want, e.max) +} + +func IsUploadSizeExceeded(err error) bool { + return errors.As(err, &ErrUploadSizeExceeded{}) +}