Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions app/artifact-cas/internal/service/bytestream.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright 2024 The Chainloop Authors.
// Copyright 2024-2026 The Chainloop Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -75,15 +75,15 @@ func (s *ByteStreamService) Write(stream bytestream.ByteStream_WriteServer) erro
return kerrors.BadRequest("resource name", err.Error())
}

backend, err := s.loadBackend(ctx, info.BackendType, info.StoredSecretID)
storageBackend, err := s.loadBackend(ctx, info.BackendType, info.StoredSecretID)
if err != nil && kerrors.IsNotFound(err) {
return err
} else if err != nil {
return sl.LogAndMaskErr(err, s.log)
}

// We check if the file already exists even before we wait for the whole buffer to be filled
if exists, err := backend.Exists(ctx, req.resource.Digest); err != nil {
if exists, err := storageBackend.Exists(ctx, req.resource.Digest); err != nil {
return sl.LogAndMaskErr(err, s.log)
} else if exists {
s.log.Infow("msg", "artifact already exists", "digest", req.resource.Digest)
Expand All @@ -95,6 +95,9 @@ func (s *ByteStreamService) Write(stream bytestream.ByteStream_WriteServer) erro
buffer := newStreamReader(info.MaxBytes)
// Add data from first request
if err = buffer.Write(req.GetData()); err != nil {
if backend.IsUploadSizeExceeded(err) {
return status.Error(codes.ResourceExhausted, err.Error())
}
return sl.LogAndMaskErr(err, s.log)
}

Expand All @@ -115,11 +118,14 @@ func (s *ByteStreamService) Write(stream bytestream.ByteStream_WriteServer) erro
s.log.Infow("msg", "upload canceled", "digest", req.resource.Digest, "name", req.resource.FileName)
return nil
}
if backend.IsUploadSizeExceeded(err) {
return status.Error(codes.ResourceExhausted, err.Error())
}
return sl.LogAndMaskErr(err, s.log)
}

s.log.Infow("msg", "artifact received, uploading now to backend", "name", req.resource.FileName, "digest", req.resource.Digest, "size", buffer.size)
if err := backend.Upload(ctx, buffer, req.resource); err != nil {
if err := storageBackend.Upload(ctx, buffer, req.resource); err != nil {
return sl.LogAndMaskErr(err, s.log)
}

Expand Down Expand Up @@ -245,7 +251,7 @@ func (r *streamReader) Write(data []byte) error {
// Check if the size of the buffer has exceeded the maximum allowed size
// if maxSize is 0, then there is no limit
if r.maxSize != 0 && r.size > r.maxSize {
return fmt.Errorf("max size of upload exceeded: want=%d, max=%d", r.size, r.maxSize)
return backend.NewErrUploadSizeExceeded(r.size, r.maxSize)
}

_, err := r.Buffer.Write(data)
Expand Down
30 changes: 28 additions & 2 deletions app/artifact-cas/internal/service/bytestream_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright 2024 The Chainloop Authors.
// Copyright 2024-2026 The Chainloop Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -23,6 +23,7 @@ import (
"errors"
"io"
"net"
"strconv"
"testing"

v1 "github.com/chainloop-dev/chainloop/app/artifact-cas/api/cas/v1"
Expand Down Expand Up @@ -77,6 +78,7 @@ func (s *bytestreamSuite) TestStreamReaderOverflow() {
s.Equal(int64(5), buffer.size)
err = buffer.Write([]byte("chainloop"))
s.Error(err)
s.True(backend.IsUploadSizeExceeded(err))
s.ErrorContains(err, "max size of upload exceeded")
}

Expand Down Expand Up @@ -119,6 +121,25 @@ func (s *bytestreamSuite) TestWrite() {
_, err = stream.CloseAndRecv()
assertGRPCError(t, err, codes.InvalidArgument, "resourceName must be set")
})

s.T().Run("max size exceeded", func(t *testing.T) {
ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs("role", "uploader", "max-bytes", "8"))
s.ociBackend.On("Exists", mock.Anything, s.resource.Digest).Return(false, nil)

stream, err := s.client.Write(ctx)
s.NoError(err)
s.NoError(stream.Send(&bytestream.WriteRequest{
ResourceName: encodeResource(s.T(), s.resource),
Data: []byte("hello"),
}))
s.NoError(stream.Send(&bytestream.WriteRequest{
ResourceName: encodeResource(s.T(), s.resource),
Data: []byte("chainloop"),
}))

_, err = stream.CloseAndRecv()
assertGRPCError(t, err, codes.ResourceExhausted, "max size of upload exceeded")
})
}

// NOTE: separated test cases for each error case to make sure the context and stubs are re-set
Expand Down Expand Up @@ -171,7 +192,7 @@ func (s *bytestreamSuite) TestWriteErrorUploading() {
}))

_, err = stream.CloseAndRecv()
s.Error(err)
assertGRPCError(s.T(), err, codes.Internal, "")
}

func (s *bytestreamSuite) TestRead() {
Expand Down Expand Up @@ -298,6 +319,11 @@ func (s *bytestreamSuite) SetupTest() {
claims := &casJWT.Claims{
StoredSecretID: "secret-id", BackendType: backendType,
}
if maxBytes := md.Get("max-bytes"); len(maxBytes) > 0 {
parsedMaxBytes, err := strconv.ParseInt(maxBytes[0], 10, 64)
require.NoError(s.T(), err)
claims.MaxBytes = parsedMaxBytes
}

if roles := md.Get("role"); len(roles) > 0 {
if roles[0] == "downloader" {
Expand Down
19 changes: 18 additions & 1 deletion pkg/blobmanager/errors.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright 2023 The Chainloop Authors.
// Copyright 2023-2026 The Chainloop Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -35,3 +35,20 @@ func (e ErrNotFound) Error() string {
func IsNotFound(err error) bool {
return errors.As(err, &ErrNotFound{})
}

type ErrUploadSizeExceeded struct {
want int64
max int64
}

func NewErrUploadSizeExceeded(want, maxSize int64) ErrUploadSizeExceeded {
return ErrUploadSizeExceeded{want: want, max: maxSize}
}

func (e ErrUploadSizeExceeded) Error() string {
return fmt.Sprintf("max size of upload exceeded: want=%d, max=%d", e.want, e.max)
}

func IsUploadSizeExceeded(err error) bool {
return errors.As(err, &ErrUploadSizeExceeded{})
}
Loading