Skip to content

Split tile fetching and decoding #37

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions python/python/async_tiff/_decoder.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ from collections.abc import Buffer
from .enums import CompressionMethod

class Decoder(Protocol):
# In the future, we could pass in photometric interpretation and jpeg tables as
# well.
@staticmethod
def __call__(buffer: Buffer) -> Buffer: ...

Expand Down
66 changes: 33 additions & 33 deletions src/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ use crate::error::{AiocogeoError, Result};
/// [`tokio::fs::File`]: https://docs.rs/tokio/latest/tokio/fs/struct.File.html
pub trait AsyncFileReader: Debug + Send + Sync {
/// Retrieve the bytes in `range`
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>>;
fn get_bytes(&self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>>;

/// Retrieve multiple byte ranges. The default implementation will call `get_bytes` sequentially
fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
fn get_byte_ranges(&self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
async move {
let mut result = Vec::with_capacity(ranges.len());

Expand All @@ -49,37 +49,37 @@ pub trait AsyncFileReader: Debug + Send + Sync {

/// This allows Box<dyn AsyncFileReader + '_> to be used as an AsyncFileReader,
impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
self.as_mut().get_bytes(range)
fn get_bytes(&self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
self.as_ref().get_bytes(range)
}

fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
self.as_mut().get_byte_ranges(ranges)
fn get_byte_ranges(&self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
self.as_ref().get_byte_ranges(ranges)
}
}

#[cfg(feature = "tokio")]
impl<T: tokio::io::AsyncRead + tokio::io::AsyncSeek + Unpin + Debug + Send + Sync> AsyncFileReader
for T
{
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
use tokio::io::{AsyncReadExt, AsyncSeekExt};

async move {
self.seek(std::io::SeekFrom::Start(range.start)).await?;

let to_read = (range.end - range.start).try_into().unwrap();
let mut buffer = Vec::with_capacity(to_read);
let read = self.take(to_read as u64).read_to_end(&mut buffer).await?;
if read != to_read {
return Err(AiocogeoError::EndOfFile(to_read, read));
}

Ok(buffer.into())
}
.boxed()
}
}
// #[cfg(feature = "tokio")]
// impl<T: tokio::io::AsyncRead + tokio::io::AsyncSeek + Unpin + Debug + Send + Sync> AsyncFileReader
// for T
// {
// fn get_bytes(&self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
// use tokio::io::{AsyncReadExt, AsyncSeekExt};

// async move {
// self.seek(std::io::SeekFrom::Start(range.start)).await?;

// let to_read = (range.end - range.start).try_into().unwrap();
// let mut buffer = Vec::with_capacity(to_read);
// let read = self.take(to_read as u64).read_to_end(&mut buffer).await?;
// if read != to_read {
// return Err(AiocogeoError::EndOfFile(to_read, read));
// }

// Ok(buffer.into())
// }
// .boxed()
// }
// }

#[derive(Clone, Debug)]
pub struct ObjectReader {
Expand All @@ -97,14 +97,14 @@ impl ObjectReader {
}

impl AsyncFileReader for ObjectReader {
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
fn get_bytes(&self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
self.store
.get_range(&self.path, range)
.map_err(|e| e.into())
.boxed()
}

fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>>
fn get_byte_ranges(&self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>>
where
Self: Send,
{
Expand All @@ -125,14 +125,14 @@ pub struct PrefetchReader {
}

impl PrefetchReader {
pub async fn new(mut reader: Box<dyn AsyncFileReader>, prefetch: u64) -> Result<Self> {
pub async fn new(reader: Box<dyn AsyncFileReader>, prefetch: u64) -> Result<Self> {
let buffer = reader.get_bytes(0..prefetch).await?;
Ok(Self { reader, buffer })
}
}

impl AsyncFileReader for PrefetchReader {
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
fn get_bytes(&self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
if range.start < self.buffer.len() as _ {
if range.end < self.buffer.len() as _ {
let usize_range = range.start as usize..range.end as usize;
Expand All @@ -147,7 +147,7 @@ impl AsyncFileReader for PrefetchReader {
}
}

fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>>
fn get_byte_ranges(&self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>>
where
Self: Send,
{
Expand Down
6 changes: 2 additions & 4 deletions src/cog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,8 @@ mod test {

let ifd = &cog_reader.ifds.as_ref()[1];
let decoder_registry = DecoderRegistry::default();
let tile = ifd
.get_tile(0, 0, Box::new(reader), &decoder_registry)
.await
.unwrap();
let tile = ifd.fetch_tile(0, 0, &reader).await.unwrap();
let tile = tile.decode(&decoder_registry).unwrap();
std::fs::write("img.buf", tile).unwrap();
}

Expand Down
20 changes: 0 additions & 20 deletions src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,26 +116,6 @@ impl Decoder for UncompressedDecoder {
}
}

// https://github.com/image-rs/image-tiff/blob/3bfb43e83e31b0da476832067ada68a82b378b7b/src/decoder/image.rs#L370
pub(crate) fn decode_tile(
buf: Bytes,
photometric_interpretation: PhotometricInterpretation,
compression_method: CompressionMethod,
// compressed_length: u64,
jpeg_tables: Option<&[u8]>,
decoder_registry: &DecoderRegistry,
) -> Result<Bytes> {
let decoder =
decoder_registry
.0
.get(&compression_method)
.ok_or(TiffError::UnsupportedError(
TiffUnsupportedError::UnsupportedCompressionMethod(compression_method),
))?;

decoder.decode_tile(buf, photometric_interpretation, jpeg_tables)
}

// https://github.com/image-rs/image-tiff/blob/3bfb43e83e31b0da476832067ada68a82b378b7b/src/decoder/image.rs#L389-L450
fn decode_modern_jpeg(
buf: Bytes,
Expand Down
63 changes: 32 additions & 31 deletions src/ifd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use bytes::Bytes;
use num_enum::TryFromPrimitive;

use crate::async_reader::AsyncCursor;
use crate::decoder::{decode_tile, DecoderRegistry};
use crate::error::{AiocogeoError, Result};
use crate::geo::{AffineTransform, GeoKeyDirectory, GeoKeyTag};
use crate::tiff::tags::{
Expand All @@ -15,6 +14,7 @@ use crate::tiff::tags::{
};
use crate::tiff::TiffError;
use crate::tiff::Value;
use crate::tile::TiffTile;
use crate::AsyncFileReader;

const DOCUMENT_NAME: u16 = 269;
Expand Down Expand Up @@ -166,7 +166,7 @@ pub struct ImageFileDirectory {

pub(crate) sample_format: Vec<SampleFormat>,

pub(crate) jpeg_tables: Option<Vec<u8>>,
pub(crate) jpeg_tables: Option<Bytes>,

pub(crate) copyright: Option<String>,

Expand Down Expand Up @@ -339,7 +339,7 @@ impl ImageFileDirectory {
.collect(),
);
}
Tag::JPEGTables => jpeg_tables = Some(value.into_u8_vec()?),
Tag::JPEGTables => jpeg_tables = Some(value.into_u8_vec()?.into()),
Tag::Copyright => copyright = Some(value.into_string()?),

// Geospatial tags
Expand Down Expand Up @@ -728,33 +728,33 @@ impl ImageFileDirectory {
Some(offset as _..(offset + byte_count) as _)
}

pub async fn get_tile(
/// Fetch the tile located at `x` column and `y` row using the provided reader.
pub async fn fetch_tile(
&self,
x: usize,
y: usize,
mut reader: Box<dyn AsyncFileReader>,
decoder_registry: &DecoderRegistry,
) -> Result<Bytes> {
reader: &dyn AsyncFileReader,
) -> Result<TiffTile> {
let range = self
.get_tile_byte_range(x, y)
.ok_or(AiocogeoError::General("Not a tiled TIFF".to_string()))?;
let buf = reader.get_bytes(range).await?;
decode_tile(
buf,
self.photometric_interpretation,
self.compression,
self.jpeg_tables.as_deref(),
decoder_registry,
)
let compressed_bytes = reader.get_bytes(range).await?;
Ok(TiffTile {
x,
y,
compressed_bytes,
compression_method: self.compression,
photometric_interpretation: self.photometric_interpretation,
jpeg_tables: self.jpeg_tables.clone(),
})
}

pub async fn get_tiles(
pub async fn fetch_tiles(
&self,
x: &[usize],
y: &[usize],
mut reader: Box<dyn AsyncFileReader>,
decoder_registry: &DecoderRegistry,
) -> Result<Vec<Bytes>> {
reader: &dyn AsyncFileReader,
) -> Result<Vec<TiffTile>> {
assert_eq!(x.len(), y.len(), "x and y should have same len");

// 1: Get all the byte ranges for all tiles
Expand All @@ -770,19 +770,20 @@ impl ImageFileDirectory {
// 2: Fetch using `get_ranges
let buffers = reader.get_byte_ranges(byte_ranges).await?;

// 3: Decode tiles (in the future, separate API)
let mut decoded_tiles = vec![];
for buf in buffers {
let decoded = decode_tile(
buf,
self.photometric_interpretation,
self.compression,
self.jpeg_tables.as_deref(),
decoder_registry,
)?;
decoded_tiles.push(decoded);
// 3: Create tile objects
let mut tiles = vec![];
for ((compressed_bytes, &x), &y) in buffers.into_iter().zip(x).zip(y) {
let tile = TiffTile {
x,
y,
compressed_bytes,
compression_method: self.compression,
photometric_interpretation: self.photometric_interpretation,
jpeg_tables: self.jpeg_tables.clone(),
};
tiles.push(tile);
}
Ok(decoded_tiles)
Ok(tiles)
}

/// Return the number of x/y tiles in the IFD
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub mod error;
pub mod geo;
mod ifd;
pub mod tiff;
mod tile;

pub use async_reader::{AsyncFileReader, ObjectReader, PrefetchReader};
pub use cog::COGReader;
Expand Down
77 changes: 77 additions & 0 deletions src/tile.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use bytes::Bytes;

use crate::decoder::DecoderRegistry;
use crate::error::Result;
use crate::tiff::tags::{CompressionMethod, PhotometricInterpretation};
use crate::tiff::{TiffError, TiffUnsupportedError};

/// A TIFF Tile response.
///
/// This contains the required information to decode the tile. Decoding is separated from fetching
/// so that sync and async operations can be separated and non-blocking.
///
/// This is returned by `fetch_tile`.
#[derive(Debug)]
pub struct TiffTile {
pub(crate) x: usize,
pub(crate) y: usize,
pub(crate) compressed_bytes: Bytes,
pub(crate) compression_method: CompressionMethod,
pub(crate) photometric_interpretation: PhotometricInterpretation,
pub(crate) jpeg_tables: Option<Bytes>,
}

impl TiffTile {
/// The column index of this tile.
pub fn x(&self) -> usize {
self.x
}

/// The row index of this tile.
pub fn y(&self) -> usize {
self.y
}

/// Access the compressed bytes underlying this tile.
///
/// Note that [`Bytes`] is reference-counted, so it is very cheap to clone if needed.
pub fn compressed_bytes(&self) -> &Bytes {
&self.compressed_bytes
}

/// Access the compression tag representing this tile.
pub fn compression_method(&self) -> CompressionMethod {
self.compression_method
}

/// Access the photometric interpretation tag representing this tile.
pub fn photometric_interpretation(&self) -> PhotometricInterpretation {
self.photometric_interpretation
}

/// Access the JPEG Tables, if any, from the IFD producing this tile.
///
/// Note that [`Bytes`] is reference-counted, so it is very cheap to clone if needed.
pub fn jpeg_tables(&self) -> Option<&Bytes> {
self.jpeg_tables.as_ref()
}

/// Decode this tile.
///
/// Decoding is separate from fetching so that sync and async operations do not block the same
/// runtime.
pub fn decode(&self, decoder_registry: &DecoderRegistry) -> Result<Bytes> {
let decoder = decoder_registry
.as_ref()
.get(&self.compression_method)
.ok_or(TiffError::UnsupportedError(
TiffUnsupportedError::UnsupportedCompressionMethod(self.compression_method),
))?;

decoder.decode_tile(
self.compressed_bytes.clone(),
self.photometric_interpretation,
self.jpeg_tables.as_deref(),
)
}
}