Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cloudbuild/zb-system-tests-cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ steps:
# Execute the script on the VM via SSH.
# Capture the exit code to ensure cleanup happens before the build fails.
set +e
gcloud compute ssh ${_VM_NAME} --zone=${_ZONE} --internal-ip --ssh-key-file=/workspace/.ssh/google_compute_engine --command="ulimit -n {_ULIMIT}; COMMIT_SHA=${COMMIT_SHA} _ZONAL_BUCKET=${_ZONAL_BUCKET} _PR_NUMBER=${_PR_NUMBER} bash run_zonal_tests.sh"
gcloud compute ssh ${_VM_NAME} --zone=${_ZONE} --internal-ip --ssh-key-file=/workspace/.ssh/google_compute_engine --command="ulimit -n {_ULIMIT}; COMMIT_SHA=${COMMIT_SHA} _ZONAL_BUCKET=${_ZONAL_BUCKET} CROSS_REGION_BUCKET=${_CROSS_REGION_BUCKET} _PR_NUMBER=${_PR_NUMBER} bash run_zonal_tests.sh"
EXIT_CODE=$?
set -e

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ def __init__(
self._read_id_to_download_ranges_id = {}
self._download_ranges_id_to_pending_read_ids = {}
self.persisted_size: Optional[int] = None # updated after opening the stream
self._open_retries: int = 0


async def __aenter__(self):
"""Opens the underlying bidi-gRPC connection to read from the object."""
Expand Down Expand Up @@ -257,13 +259,18 @@ async def open(
raise ValueError("Underlying bidi-gRPC stream is already open")

if retry_policy is None:
def on_error_wrapper(exc):
self._open_retries += 1
self._on_open_error(exc)

retry_policy = AsyncRetry(
predicate=_is_read_retryable, on_error=self._on_open_error
predicate=_is_read_retryable, on_error=on_error_wrapper
)
else:
original_on_error = retry_policy._on_error

def combined_on_error(exc):
self._open_retries += 1
self._on_open_error(exc)
if original_on_error:
original_on_error(exc)
Expand Down
46 changes: 46 additions & 0 deletions tests/system/test_zonal.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
# TODO: replace this with a fixture once zonal bucket creation / deletion
# is supported in grpc client or json client client.
_ZONAL_BUCKET = os.getenv("ZONAL_BUCKET")
_CROSS_REGION_BUCKET = os.getenv("CROSS_REGION_BUCKET")
_BYTES_TO_UPLOAD = b"dummy_bytes_to_write_read_and_delete_appendable_object"


Expand Down Expand Up @@ -82,6 +83,51 @@ def _get_equal_dist(a: int, b: int) -> tuple[int, int]:
return a + step, a + 2 * step


@pytest.mark.parametrize(
"object_size",
[
256, # less than _chunk size
10 * 1024 * 1024, # less than _MAX_BUFFER_SIZE_BYTES
20 * 1024 * 1024, # greater than _MAX_BUFFER_SIZE
],
)
def test_basic_wrd_x_region(
storage_client,
blobs_to_delete,
object_size,
event_loop,
grpc_client,
):
object_name = f"test_basic_wrd-{str(uuid.uuid4())}"

async def _run():
object_data = os.urandom(object_size)
object_checksum = google_crc32c.value(object_data)

writer = AsyncAppendableObjectWriter(grpc_client, _CROSS_REGION_BUCKET, object_name)
await writer.open()
await writer.append(object_data)
object_metadata = await writer.close(finalize_on_close=True)
assert object_metadata.size == object_size
assert int(object_metadata.checksums.crc32c) == object_checksum

buffer = BytesIO()
mrd = AsyncMultiRangeDownloader(grpc_client, _CROSS_REGION_BUCKET, object_name)
async with mrd:
assert mrd._open_retries == 1
# (0, 0) means read the whole object
await mrd.download_ranges([(0, 0, buffer)])
assert mrd.persisted_size == object_size

assert buffer.getvalue() == object_data

# Clean up; use json client (i.e. `storage_client` fixture) to delete.
blobs_to_delete.append(storage_client.bucket(_CROSS_REGION_BUCKET).blob(object_name))
del writer
gc.collect()

event_loop.run_until_complete(_run())

@pytest.mark.parametrize(
"object_size",
[
Expand Down