Skip to content

http: Compressed response example in Python #35

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 35 commits into from
Nov 27, 2024

Conversation

felipecrv
Copy link
Contributor

@felipecrv felipecrv commented Sep 6, 2024

@felipecrv
Copy link
Contributor Author

I still need to write the client that decompresses and parses the stream. I want to measure things like time-to-first-batch and how long it takes to download and consume streams of different compressions algorithms.

For now, the make_requests.sh script makes all kinds of requests and puts the output on many different files. I decompressed the *.arrows.gz file and diff it's identical to the uncompressed *.arrows files.

If the content-coding of an entity is not "identity", then the
response MUST include a Content-Encoding entity-header (section
14.11) that lists the non-identity content-coding(s) used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to expand this text to talk about how Arrow streams can be piped into a compressed response. Perhaps having it before this summarized explanation of encoding negotiation.

Comment on lines 124 to 140
def check_parser(s, expected):
try:
parsed = parse_accept_encoding(s)
# print("parsed:", parsed, "\nexpected:", expected)
assert parsed == expected
except ValueError as e:
print(e)


check_parser("", [])
expected = [("gzip", None), ("zstd", 1.0), ("*", None)]
check_parser("gzip, zstd;q=1.0, *", expected)
check_parser("gzip , zstd; q= 1.0 , *", expected)
expected = [("gzip", None), ("zstd", 1.0), ("*", 0.0)]
check_parser("gzip , zstd; q= 1.0 \t \r\n ,*;q =0", expected)
expected = [("zstd", 1.0), ("gzip", 0.5), ("br", 0.8), ("identity", 0.0)]
check_parser("zstd;q=1.0, gzip;q=0.5, br;q=0.8, identity;q=0", expected)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These "unit tests" will be removed before merge.

@ianmcook
Copy link
Member

ianmcook commented Sep 7, 2024

Thanks! At quick glance this looks great so far. I'll take a closer look soon.

@felipecrv
Copy link
Contributor Author

felipecrv commented Sep 10, 2024

Stats when running the server.py/client.py pair on the same M1 Pro macbook:

$ python client.py
$ python client.py
[identity]: Requesting data from http://127.0.0.1:8008 with `identity` encoding.
[identity]: Schema received in 0.008 seconds. schema=(ticker, price, volume).
[identity]: First batch of 6836 received and processed in 0.008 seconds
[identity]: Processing of all batches completed in 0.209 seconds.
    [zstd]: Requesting data from http://127.0.0.1:8008 with `zstd` encoding.
    [zstd]: Schema received in 0.005 seconds. schema=(ticker, price, volume).
    [zstd]: First batch of 6836 received and processed in 0.005 seconds
    [zstd]: Processing of all batches completed in 2.418 seconds.
      [br]: Requesting data from http://127.0.0.1:8008 with `br` encoding.
      [br]: Schema received in 0.103 seconds. schema=(ticker, price, volume).
      [br]: First batch of 6836 received and processed in 0.103 seconds
      [br]: Processing of all batches completed in 7.650 seconds.
    [gzip]: Requesting data from http://127.0.0.1:8008 with `gzip` encoding.
    [gzip]: Schema received in 0.045 seconds. schema=(ticker, price, volume).
    [gzip]: First batch of 6836 received and processed in 0.045 seconds
    [gzip]: Processing of all batches completed in 48.114 seconds.

The uncompressed response size is almost 1GB. I think brotli is getting really high compression ratio here because the batches of data are random slices of the same base array.

output.arrows         943M 
output.arrows.br       36M 
output.arrows.gz      344M 
output.arrows.zstd    336M 

From one laptop to another on my home Wi-Fi and 1/10 of the records:

$ python client.py
$ python client.py
[identity]: Requesting data from http://192.168.68.103:8008 with `identity` encoding.
[identity]: Schema received in 0.110 seconds. schema=(ticker, price, volume).
[identity]: First batch of 684 received and processed in 0.133 seconds
[identity]: Processing of all batches completed in 17.448 seconds.
    [zstd]: Requesting data from http://192.168.68.103:8008 with `zstd` encoding.
    [zstd]: Schema received in 0.023 seconds. schema=(ticker, price, volume).
    [zstd]: First batch of 684 received and processed in 0.032 seconds
    [zstd]: Processing of all batches completed in 6.133 seconds.
      [br]: Requesting data from http://192.168.68.103:8008 with `br` encoding.
      [br]: Schema received in 0.118 seconds. schema=(ticker, price, volume).
      [br]: First batch of 684 received and processed in 0.118 seconds
      [br]: Processing of all batches completed in 1.096 seconds.
    [gzip]: Requesting data from http://192.168.68.103:8008 with `gzip` encoding.
    [gzip]: Schema received in 0.203 seconds. schema=(ticker, price, volume).
    [gzip]: First batch of 684 received and processed in 0.203 seconds
    [gzip]: Processing of all batches completed in 6.294 seconds.

@felipecrv
Copy link
Contributor Author

felipecrv commented Sep 10, 2024

We can make Brotli less impressive by feeding it more random data:

output2.arrows        945M
output2.arrows.br     264M (brotli still a winner)
output2.arrows.gz     344M (gzip now wins on compression)
output2.arrows.zstd   371M (Zstd is good)

(generated batches with random values instead of simply slicing from a big array)

--- a/http/get_compressed/python/server/server.py
+++ b/http/get_compressed/python/server/server.py
@@ -72,13 +72,14 @@ def example_batches(tickers):
     total_records = 42_000_000
     batch_len = 6 * 1024
     # all the batches sent are random slices of the larger base batch
-    base_batch = example_batch(tickers, length=8 * batch_len)
+    # base_batch = example_batch(tickers, length=8 * batch_len)
     batches = []
     records = 0
     while records < total_records:
         length = min(batch_len, total_records - records)
-        offset = randint(0, base_batch.num_rows - length - 1)
-        batch = base_batch.slice(offset, length)
+        # offset = randint(0, base_batch.num_rows - length - 1)
+        # batch = base_batch.slice(offset, length)
+        batch = example_batch(tickers, length)
         batches.append(batch)
         records += length
     return batches

What is the CPU overhead though? All requests are over 127.0.0.1 so CPU cost should dominate. And zstd wins big even though it doesn't produce the smallest possible response like brotli.

$ python client.py
$ python client.py
[identity]: Requesting data from http://127.0.0.1:8008 with `identity` encoding.
[identity]: Schema received in 0.008 seconds. schema=(ticker, price, volume).
[identity]: First batch of 6836 received and processed in 0.009 seconds
[identity]: Processing of all batches completed in 0.238 seconds.
    [zstd]: Requesting data from http://127.0.0.1:8008 with `zstd` encoding.
    [zstd]: Schema received in 0.004 seconds. schema=(ticker, price, volume).
    [zstd]: First batch of 6836 received and processed in 0.004 seconds
    [zstd]: Processing of all batches completed in 2.613 seconds.
      [br]: Requesting data from http://127.0.0.1:8008 with `br` encoding.
      [br]: Schema received in 0.512 seconds. schema=(ticker, price, volume).
      [br]: First batch of 6836 received and processed in 0.512 seconds
      [br]: Processing of all batches completed in 52.460 seconds.
    [gzip]: Requesting data from http://127.0.0.1:8008 with `gzip` encoding.
    [gzip]: Schema received in 0.044 seconds. schema=(ticker, price, volume).
    [gzip]: First batch of 6836 received and processed in 0.044 seconds
    [gzip]: Processing of all batches completed in 47.742 seconds.

Now with the server running on a different laptop and loading the response over Wi-Fi and 1/10 of the data. Zstd is still the winner.

$ python client.py
[identity]: Requesting data from http://192.168.68.103:8008 with `identity` encoding.
[identity]: Schema received in 0.112 seconds. schema=(ticker, price, volume).
[identity]: First batch of 684 received and processed in 0.140 seconds
[identity]: Processing of all batches completed in 20.580 seconds.
    [zstd]: Requesting data from http://192.168.68.103:8008 with `zstd` encoding.
    [zstd]: Schema received in 0.035 seconds. schema=(ticker, price, volume).
    [zstd]: First batch of 684 received and processed in 0.037 seconds
    [zstd]: Processing of all batches completed in 7.440 seconds.
      [br]: Requesting data from http://192.168.68.103:8008 with `br` encoding.
      [br]: Schema received in 0.542 seconds. schema=(ticker, price, volume).
      [br]: First batch of 684 received and processed in 0.542 seconds
      [br]: Processing of all batches completed in 9.821 seconds.
    [gzip]: Requesting data from http://192.168.68.103:8008 with `gzip` encoding.
    [gzip]: Schema received in 0.059 seconds. schema=(ticker, price, volume).
    [gzip]: First batch of 684 received and processed in 0.060 seconds
    [gzip]: Processing of all batches completed in 6.947 seconds.

@pitrou
Copy link
Member

pitrou commented Sep 10, 2024

Arrow IPC buffer compression is probably preferrable to HTTP compression.

@felipecrv
Copy link
Contributor Author

Arrow IPC buffer compression is probably preferrable to HTTP compression.

Why?

@pitrou
Copy link
Member

pitrou commented Sep 10, 2024

Because it only supports modern compressors with extremely high decompression speed (lz4, zstd).
There may also be a latency advantage as metadata is not compressed, only data.

@felipecrv
Copy link
Contributor Author

Because it only supports modern compressors with extremely high decompression speed (lz4, zstd).

Browsers support zstd already. So you only really fallback to slow gzip when nothing else is available. For a JavaScript app running on a browser, the only available compressor API is GZip so by using Zstd at the HTTP layer, you get to use the modern compression algorithm.

There may also be a latency advantage as metadata is not compressed, only data.

I'm experimenting with dynamic buffer sizing and frame flushes that align with the HTTP chunk boundaries to improve latency. I can ensure metadata reaches the client as soon as possible. When the compressed stream comes from the network, the cost of decompressing a single block of the Zstd stream to get metadata is too small relative to the network latency.

Buffer-level compression can also increase latency by buffering the whole compressed buffer before producing the length and putting that on the stream.

@felipecrv felipecrv changed the title [WORK-IN-PROGRESS] http: Compressed response example in Python http: Compressed response example in Python Sep 10, 2024
@felipecrv felipecrv marked this pull request as ready for review September 10, 2024 22:36
@felipecrv
Copy link
Contributor Author

felipecrv commented Sep 12, 2024

I added the option to dictionary-encode a column in the compression example. Results are interesting.

From not dictionary-encoded to sharing the same dictionary of 60 strings in the ticker column:

output.arrows         941M  803M  -138M
output.arrows.gz      344M  247M   -97M
output.arrows.zstd    336M  205M  -131M
output.arrows.brotli   35M   39M    +4M

Interestingly, brotli compresses better when the data is not dictionary encoded.

@felipecrv
Copy link
Contributor Author

felipecrv commented Sep 12, 2024

Now including dictionary encoding at the data generation time (on all these cases) and IPC buffer compression.

# uncompressed
803M  out.arrows

# HTTP response compression  # IPC buffer compression
208M  out.arrows.zstd        220M  out.arrows+zstd
247M  out.arrows.gz          
 38M  out.arrows.br
                             404M  out.arrows+lz4
Timings
[identity]: Requesting data from http://127.0.0.1:8008 with `identity` compression strategy.
[identity]: Schema received in 0.007 seconds. schema=(ticker, price, volume).
[identity]: First batch received and processed in 0.007 seconds
[identity]: Processing of all batches completed in 0.194 seconds.
[identity]: ReadStats(num_messages=6838, num_record_batches=6836, num_dictionary_batches=1, num_dictionary_deltas=0, num_replaced_dictionaries=0)
    [zstd]: Requesting data from http://127.0.0.1:8008 with `zstd` compression strategy.
    [zstd]: Schema received in 0.006 seconds. schema=(ticker, price, volume).
    [zstd]: First batch received and processed in 0.006 seconds
    [zstd]: Processing of all batches completed in 1.934 seconds.
    [zstd]: ReadStats(num_messages=6838, num_record_batches=6836, num_dictionary_batches=1, num_dictionary_deltas=0, num_replaced_dictionaries=0)
      [br]: Requesting data from http://127.0.0.1:8008 with `br` compression strategy.
      [br]: Schema received in 0.111 seconds. schema=(ticker, price, volume).
      [br]: First batch received and processed in 0.111 seconds
      [br]: Processing of all batches completed in 7.824 seconds.
      [br]: ReadStats(num_messages=6838, num_record_batches=6836, num_dictionary_batches=1, num_dictionary_deltas=0, num_replaced_dictionaries=0)
    [gzip]: Requesting data from http://127.0.0.1:8008 with `gzip` compression strategy.
    [gzip]: Schema received in 0.026 seconds. schema=(ticker, price, volume).
    [gzip]: First batch received and processed in 0.026 seconds
    [gzip]: Processing of all batches completed in 41.153 seconds.
    [gzip]: ReadStats(num_messages=6838, num_record_batches=6836, num_dictionary_batches=1, num_dictionary_deltas=0, num_replaced_dictionaries=0)
[identity+zstd]: Requesting data from http://127.0.0.1:8008 with `identity+zstd` compression strategy.
[identity+zstd]: Schema received in 0.001 seconds. schema=(ticker, price, volume).
[identity+zstd]: First batch received and processed in 0.001 seconds
[identity+zstd]: Processing of all batches completed in 0.180 seconds.
[identity+zstd]: ReadStats(num_messages=6838, num_record_batches=6836, num_dictionary_batches=1, num_dictionary_deltas=0, num_replaced_dictionaries=0)
[identity+lz4]: Requesting data from http://127.0.0.1:8008 with `identity+lz4` compression strategy.
[identity+lz4]: Schema received in 0.001 seconds. schema=(ticker, price, volume).
[identity+lz4]: First batch received and processed in 0.001 seconds
[identity+lz4]: Processing of all batches completed in 0.184 seconds.
[identity+lz4]: ReadStats(num_messages=6838, num_record_batches=6836, num_dictionary_batches=1, num_dictionary_deltas=0, num_replaced_dictionaries=0)

@ianmcook
Copy link
Member

ianmcook commented Sep 17, 2024

@felipecrv would you say that the below are valid interpretations of the timings and compression ratios above?

  • Using Arrow IPC buffer compression introduces less decompression latency than using HTTP compression.
  • If it's not an option to use Arrow IPC buffer compression (e.g. because it's not implemented in the Arrow library you're using), then:
    • If the network is very fast and data transfer costs are not a concern at all, don't use any HTTP compression.
    • If the network is fairly fast and data transfer costs are not a major concern, zstd is often the best all-around balanced option (but YMMV so try it yourself in your real-world environment on a representative sample of datasets).
    • If the network is slower or data transfer costs are a major concern, try experimenting with other HTTP compression codecs.

@felipecrv
Copy link
Contributor Author

  • Using Arrow IPC buffer compression introduces less decompression latency than using HTTP compression.

There might be a Python overhead in these HTTP compression examples because the buffer compression happens completely inside the C++ layer and the HTTP examples connect different pyarrow classes. This is still a merit of the IPC buffer compression since Python might be present on both client and server.

Buffer compression is really beneficial to the IPC stream parser. The numbers above look very good.

  • If it's not an option to use Arrow IPC buffer compression (e.g. because it's not implemented in the Arrow library you're using), then:

    • If the network is very fast and data transfer costs are not a concern at all, don't use any HTTP compression.

I would recommend zstd. The network has to be very fast and reliable for zstd to not be helpful.

  • If the network is fairly fast and data transfer costs are not a major concern, zstd is often the best all-around balanced option (but YMMV so try it yourself in your real-world environment on a representative sample of datasets).

I would emphasize the zstd recommendation more. Almost no network is as reliable as the loopback interface at 127.0.0.1 :D

  • If the network is slower or data transfer costs are a major concern, try experimenting with other HTTP compression codecs.

Indeed.

@ianmcook
Copy link
Member

@felipecrv thanks — that all sounds good.

This is still a merit of the IPC buffer compression since Python might be present on both client and server.

Agreed. I think we only need to warn developers away from it when they don't control the application on the other end of the connection. (For example, if you're a SaaS service adding an Arrow-over-HTTP protocol.)

Another issue to consider is that if you are caching results in Arrow format and serving them to HTTP clients as opaque binary data, and you need to make this work with the lowest-common-denominator Arrow client library, then IPC buffer compression is a poor option for you.

@felipecrv
Copy link
Contributor Author

and you need to make this work with the lowest-common-denominator Arrow client library, then IPC buffer compression is a poor option for you.

Not so clear cut, because you could use IPC buffer compression and decompress right before returning to save on cache storage space.

@ianmcook
Copy link
Member

Not so clear cut, because you could use IPC buffer compression and decompress right before returning to save on cache storage space.

Perhaps... but do any Arrow implements provide efficient, easy-to-use methods to convert an IPC stream with compressed buffers to an IPC stream with uncompressed buffers? This could be done simply enough with a small procedure in most of the Arrow implementations, but I'm not sure how efficient it would be.

@felipecrv
Copy link
Contributor Author

Not so clear cut, because you could use IPC buffer compression and decompress right before returning to save on cache storage space.

Perhaps... but do any Arrow implements provide efficient, easy-to-use methods to convert an IPC stream with compressed buffers to an IPC stream with uncompressed buffers? This could be done simply enough with a small procedure in most of the Arrow implementations, but I'm not sure how efficient it would be.

It would require an intermediate step. But if the cache exists to reduce the need for some expensive computation that produces the RecordBatch stream, even expensive decompression and re-serialization could pay off.

Copy link
Member

@amoeba amoeba left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really fantastic @felipecrv.

I left a couple of comments and a code suggestion but I had some high-level thoughts:

  • To follow the convention established in other examples in this repo, the server and client folders should get concise README.md files with instructions on how to prepare an environment to run the example server and client.
  • The current README has turned into a really useful guidance document which I think should live on the main website. If we did that, this README could be made into a short document like the others in this repo and could link to that guidance document.
  • The benchmarking you've done in PR comments may be one of the most useful exports of this work. I think that should get published somewhere.

@ianmcook
Copy link
Member

Thanks for doing this @felipecrv !

A couple of requests:

  • Could you please move make_requests.sh to http/get_compressed/curl/client/?
  • Could you please add README.md files in http/get_compressed/python/client/, http/get_compressed/python/server/, and http/get_compressed/curl/client/? These can be really basic (similar to the ones in http/get_simple). They should briefly explain what's in each subdirectory and how to run it.

@felipecrv
Copy link
Contributor Author

@amoeba wrote:

  • To follow the convention established in other examples in this repo, the server and client folders should get concise README.md files with instructions on how to prepare an environment to run the example server and client.

Could the convention be broken here considering that readers will be better off reading the recommendations and picking a single compression algorithm that they use on their own application?

  • The current README has turned into a really useful guidance document which I think should live on the main website. If we did that, this README could be made into a short document like the others in this repo and could link to that guidance document.

Goal of this repo is to work as a staging area for content that will eventually move to the official docs.

  • The benchmarking you've done in PR comments may be one of the most useful exports of this work. I think that should get published somewhere.

Benchmarking compression is very tricky because it depends too much on distribution of the values and using randomly generated data like I did here pretty much defines the compression rate one gets, so it's better that people get a feel for how compression behaves on their workloads, CPU and network bandwidth budgets.

@felipecrv
Copy link
Contributor Author

felipecrv commented Nov 22, 2024

@ianmcook wrote:

  • Could you please move make_requests.sh to http/get_compressed/curl/client/?

Makes sense! Doing it now.

  • Could you please add README.md files in http/get_compressed/python/client/, http/get_compressed/python/server/, and http/get_compressed/curl/client/? These can be really basic (similar to the ones in http/get_simple). They should briefly explain what's in each subdirectory and how to run it.
  • http/get_compressed/python/client/README.md
  • http/get_compressed/python/server/README.md
  • http/get_compressed/curl/client/README.md

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Python] Create Python examples of HTTP GET Arrow client and server using HTTP compression
4 participants