Skip to content

Conversation

@dcherian
Copy link

@dcherian dcherian commented Dec 4, 2024

for block in serialize_data.blocks:
flat = data.ravel()
for start in range(0, data.size, chunk_size):
block = flat[slice(start, chunk_size)]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be

Suggested change
block = flat[slice(start, chunk_size)]
block = flat[slice(start, start+chunk_size)]

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clearly this needs tests!

# we load one `DASK_ENCODE_CHUNK_SIZE`-sized block of linearized data
# in to memory at one go. This may overlap with multiple dask chunks
# so lets cache those chunks since we might come back to them.
cache = Cache(Config.DASK_CACHE_SIZE)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be configured at the server level? That is what we do

Copy link
Author

@dcherian dcherian Dec 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I think we should apply the cache more locally in that loop in dods_encode. We want to cache aggressively when we have multiple batches to stream out for a single request from a single array. This is because the order in which we yield bytes can be orthogonal to chunking, and we can visit the same chunk multiple times.

I think the more global server cache is appropriate for a less aggressive cache across multiple requests.

Perhaps we can pair at some point and just iterate through some options with a benchmark problem.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also seems like a good place to stick in a bit of async: compute the next iteration while streaming out the current iteration.

if isinstance(data, da.Array):
block = data[slice(start, end)].compute()
elif has_xarray and isinstance(data, Variable):
npidxr = np.unravel_index(np.arange(start, min(end, data.size)), shape=data.shape)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image for 30MB blocks,

@dcherian dcherian changed the title Add dask caching, avoid rechunk Always stream out blocks in dods_encode Dec 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants