Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,22 @@ public interface BlobReadSession extends AutoCloseable, Closeable {
@TransportCompatibility({Transport.GRPC})
<Projection> Projection readAs(ReadProjectionConfig<Projection> config);

/**
* Read all {@code configs} from this session as a specific {@code Projection} as dictated by the
* provided {@code configs}.
*
* <p>This allows for batching multiple reads into a single request, which can be more efficient
* than calling {@link #readAs(ReadProjectionConfig)} in a loop.
*
* @see ReadProjectionConfig
* @see ReadProjectionConfigs
* @since 2.51.0 This new api is in preview and is subject to breaking changes.
*/
@BetaApi
@TransportCompatibility({Transport.GRPC})
<Projection> java.util.List<Projection> readAllAs(
java.util.List<ReadProjectionConfig<Projection>> configs);

/**
* Close this session and any {@code Projection}s produced by {@link
* #readAs(ReadProjectionConfig)}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,23 @@ public <Projection> Projection readAs(ReadProjectionConfig<Projection> config) {
return projection;
}

@SuppressWarnings({"rawtypes", "unchecked"})
@Override
public <Projection> java.util.List<Projection> readAllAs(
java.util.List<ReadProjectionConfig<Projection>> configs) {
java.util.List<Projection> projections = session.readAllAs(configs);
java.util.List<Projection> wrapped = new java.util.ArrayList<>(projections.size());
for (Projection projection : projections) {
if (projection instanceof ApiFuture) {
ApiFuture apiFuture = (ApiFuture) projection;
wrapped.add((Projection) StorageException.coalesceAsync(apiFuture));
} else {
wrapped.add(projection);
}
}
return wrapped;
}

@Override
public void close() throws IOException {
session.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,7 @@ interface ObjectReadSession extends IOAutoCloseable {
Object getResource();

<Projection> Projection readAs(ReadProjectionConfig<Projection> config);

<Projection> java.util.List<Projection> readAllAs(
java.util.List<ReadProjectionConfig<Projection>> configs);
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,46 @@ public <Projection> Projection readAs(ReadProjectionConfig<Projection> config) {
}
}

@Override
public <Projection> java.util.List<Projection> readAllAs(
java.util.List<ReadProjectionConfig<Projection>> configs) {
checkState(open, "Session already closed");
java.util.List<Projection> results = new ArrayList<>(configs.size());

// Filter for STREAM_READ configs to batch
List<ObjectReadSessionStreamRead<Projection>> batchedReads = new ArrayList<>();
List<Long> batchedReadIds = new ArrayList<>();

for (ReadProjectionConfig<Projection> config : configs) {
switch (config.getType()) {
case STREAM_READ:
long readId = state.newReadId();
ObjectReadSessionStreamRead<Projection> read =
config.cast().newRead(readId, retryContextProvider.create());
batchedReads.add(read);
batchedReadIds.add(readId);
results.add(read.project());
break;
case SESSION_USER:
results.add(config.project(this, IOAutoCloseable.noOp()));
break;
default:
throw new IllegalStateException(
String.format(
Locale.US,
"Broken java enum %s value=%s",
ProjectionType.class.getName(),
config.getType().name()));
}
}

if (!batchedReads.isEmpty()) {
registerBatchReadsInState(batchedReadIds, batchedReads);
}

return results;
}

@Override
public void close() throws IOException {
try {
Expand Down Expand Up @@ -137,6 +177,61 @@ private void registerReadInState(long readId, ObjectReadSessionStreamRead<?> rea
}
}

private void registerBatchReadsInState(
List<Long> readIds, List<? extends ObjectReadSessionStreamRead<?>> reads) {
// 1. Check if the current state can handle ALL new reads
// We assume if it can handle one valid read of the batch, it can handle all,
// BUT we must ensure transactionality or consistency if they differ.
// However, usually they differ only by offset/length.
// Let's check all or check if they are compatible with each other + state.
// For now, we check if state can handle the first one?
// Correctness: we should check all or rely on the fact they come from same session/context.
// If state implies "same generation/metadata", then all reads should be fine if one is fine.
// Let's verify strict correctness:
boolean allCompatible = true;
for (ObjectReadSessionStreamRead<?> read : reads) {
if (!state.canHandleNewRead(read)) {
allCompatible = false;
break;
}
}

// Prepare Request Builder
BidiReadObjectRequest.Builder requestBuilder = BidiReadObjectRequest.newBuilder();
for (ObjectReadSessionStreamRead<?> read : reads) {
requestBuilder.addReadRanges(read.makeReadRange());
}
BidiReadObjectRequest request = requestBuilder.build();

if (allCompatible) {
for (int i = 0; i < readIds.size(); i++) {
state.putOutstandingRead(readIds.get(i), reads.get(i));
}
stream.send(request);
} else {
// Fork a new child for this BATCH
ObjectReadSessionState child = state.forkChild();
ObjectReadSessionStream newStream =
ObjectReadSessionStream.create(executor, callable, child, retryContextProvider.create());
children.put(newStream, child);

// We only need one close callback for the stream closure, so we can attach it to the first read
// or all? If we attach to all, we might try to close multiple times, which is fine (concurrent map remove).
IOAutoCloseable closeCallback =
() -> {
children.remove(newStream);
newStream.close();
};

for (int i = 0; i < readIds.size(); i++) {
ObjectReadSessionStreamRead<?> read = reads.get(i);
read.setOnCloseCallback(closeCallback);
child.putOutstandingRead(readIds.get(i), read);
}
newStream.send(request);
}
}

@VisibleForTesting
static final class ConcurrentIdentityMap<K, V> {
private final ReentrantLock lock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -1989,6 +1990,36 @@ public <Projection> Projection readAs(ReadProjectionConfig<Projection> config) {
}
}

@Override
public <Projection> java.util.List<Projection> readAllAs(
java.util.List<ReadProjectionConfig<Projection>> configs) {
java.util.List<Span> spans = new ArrayList<>(configs.size());
java.util.List<ReadProjectionConfig<Projection>> otrConfigs =
new ArrayList<>(configs.size());

for (ReadProjectionConfig<Projection> config : configs) {
Span readRangeSpan =
tracer
.spanBuilder(BLOB_READ_SESSION + "/readAs")
.setAttribute("gsutil.uri", id.toGsUtilUriWithGeneration())
.setParent(blobReadSessionContext)
.startSpan();
spans.add(readRangeSpan);
otrConfigs.add(new OtelReadProjectionConfig<>(config, readRangeSpan));
}

try {
return delegate.readAllAs(otrConfigs);
} catch (Throwable t) {
for (Span span : spans) {
span.recordException(t);
span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName());
span.end();
}
throw t;
}
}

@Override
public void close() throws IOException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ public <Projection> Projection readAs(ReadProjectionConfig<Projection> config) {
return null;
}

@Override
public <Projection> java.util.List<Projection> readAllAs(
java.util.List<ReadProjectionConfig<Projection>> configs) {
throw new UnsupportedOperationException();
}

@Override
public void close() throws IOException {
sessionCloseCount.getAndIncrement();
Expand Down
10 changes: 10 additions & 0 deletions renovate.json
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,16 @@
"^ch.qos.logback:logback-classic"
],
"allowedVersions": "<1.4.0"
},
{
"description": "Disable updates for templatized GitHub Actions",
"matchManagers": [
"github-actions"
],
"matchFileNames": [
".github/workflows/**"
],
"enabled": false
}
],
"semanticCommits": true,
Expand Down
Loading