Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate zstd compression into chain exchange #842

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

masih
Copy link
Member

@masih masih commented Jan 22, 2025

The GPBFT message exchange over pubsub already uses zstd compression on top of CBOR encoded messages. The work here integrates the same style of compression for chain exchange messages, with additional unification of the encoding mechanism across the two.

The work refactors the root level encoding implementation into a generic encoder decoder that both chain exchange and gpbft used. Tests and benchmarks are updated to reflect this.

The benchmarking of partial gmessage encoding is also adjusted to fix a few redundant statements and bugs in testing.

Fixes #819 #843

@masih masih self-assigned this Jan 22, 2025
@masih masih requested a review from Kubuxu January 22, 2025 14:50
Copy link

codecov bot commented Jan 22, 2025

Codecov Report

Attention: Patch coverage is 61.40351% with 22 lines in your changes missing coverage. Please review.

Project coverage is 67.32%. Comparing base (ded3d04) to head (13487e2).

Files with missing lines Patch % Lines
internal/encoding/encoding.go 61.90% 11 Missing and 5 partials ⚠️
chainexchange/pubsub.go 44.44% 2 Missing and 3 partials ⚠️
host.go 83.33% 0 Missing and 1 partial ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main     #842      +/-   ##
==========================================
- Coverage   67.38%   67.32%   -0.07%     
==========================================
  Files          83       83              
  Lines        9017     9024       +7     
==========================================
- Hits         6076     6075       -1     
- Misses       2408     2412       +4     
- Partials      533      537       +4     
Files with missing lines Coverage Δ
host.go 66.10% <83.33%> (+1.51%) ⬆️
chainexchange/pubsub.go 70.04% <44.44%> (+1.43%) ⬆️
internal/encoding/encoding.go 61.90% <61.90%> (ø)

... and 6 files with indirect coverage changes

@Stebalien
Copy link
Member

Can we add a compression bomb test? I think we're fine, but it would be nice to have a test.

  • The fact that we're stream decoding into the CBOR decoder coupled with the fact that the CBOR decoder has a bunch of limits should protect us against memory issues.
  • The fact that our CBOR decoder won't read through unlimited input (e.g., doesn't have logic that skips unknown fields, etc.) means we can't get "stuck" when decoding. But it would be nice if we could wrap the decompressed reader in a limited reader before passing it off to the CBOR decoder, just in case. That way we can guarantee that we'll process at most N bytes of CBOR before giving up.

}

func NewPubSubChainExchange(o ...Option) (*PubSubChainExchange, error) {
opts, err := newOptions(o...)
if err != nil {
return nil, err
}
zstd, err := encoding.NewZSTD[*Message]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we going with ZSTD by default?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For chain exchange yes. For GPBFT it's configurable via manifest.

Happy to make it configurable for chain exchange too if you think it's worth doing.

@masih
Copy link
Member Author

masih commented Jan 22, 2025

Can we add a compression bomb test?

Sure. Captured #843

@BigLep BigLep requested a review from Kubuxu January 23, 2025 02:11
@masih
Copy link
Member Author

masih commented Jan 23, 2025

@Stebalien the message encoding decoding off Gossipsub does not function in streaming manner. Instead, both the publisher and subscriber hand over []byte slices to/fro the pubsub subsystem to do its thing.

The default maximum message size in pubsub is set to 1 MiB, which as far as I can tell has not been overridden in Lotus. This limit is large enough for the purposes of F3. So, what I have done in 05bc578 is to explicitly set the maximum decoded value in zstd to 1 MiB. For sanity, I have also restricted encoder to refuse to encode values that would hit that limit.

Does this cover your concern re expansion attack vector in the context of zstd compression?

@masih masih requested a review from Stebalien January 23, 2025 13:50
@Stebalien
Copy link
Member

That's good but isn't the zstd coder streaming? Looking at the docs in:

https://pkg.go.dev/github.com/klauspost/compress/zstd#WithDecoderMaxMemory

That restricts the max memory held at any point in time while streaming, not the max that can be read from the stream in total (unless I'm misreading it).

@masih
Copy link
Member Author

masih commented Jan 23, 2025

That's good but isn't the zstd coder streaming?

It has two modes: streaming where one needs to allocate a decoder per io.reader, or nonstreaming where one declares a reader with nil io.reader and uses DecodeAll. The latter is what the implementation in this repo does. The rationale for this choice was reduced GC.

(unless I'm misreading it).

The documentation reads: "WithDecoderMaxMemory allows to set a maximum decoded size for in-memory non-streaming operations or maximum window size for streaming operations. This can be used to control memory usage of potentially hostile content. ".

Right?

@Stebalien
Copy link
Member

That's good but isn't the zstd coder streaming?

It has two modes: streaming where one needs to allocate a decoder per io.reader, or nonstreaming where one declares a reader with nil io.reader and uses DecodeAll. The latter is what the implementation in this repo does. The rationale for this choice was reduced GC.

(unless I'm misreading it).

The documentation reads: "WithDecoderMaxMemory allows to set a maximum decoded size for in-memory non-streaming operations or maximum window size for streaming operations. This can be used to control memory usage of potentially hostile content. ".

Right?

Oh, I see. I assumed we used streaming decoding. Then yeah, it should work (and is actually required).

@masih masih linked an issue Jan 24, 2025 that may be closed by this pull request
masih added 2 commits January 24, 2025 17:16
The GPBFT message exchange over pubsub already uses zstd compression on
top of CBOR encoded messages. The work here integrates the same style
of compression for chain exchange messages, with additional
unification of the encoding mechanism across the two.

The work refactors the root level encoding implementation into a generic
encoder decoder that both chain exchange and gpbft used. Tests and
benchmarks are updated to reflect this.

The benchmarking of partial gmessage encoding is also adjusted to fix a
few redundant statements and bugs in testing.

Fixes #819
The default message size limit in GossipSub is 1 MiB, which is unchanged
in Lotus. This means when decompressing values, we can never have a
valid compressed message that expands to larger than 1 MiB.

Set this limit explicitly in the zstd decoder.
@masih masih force-pushed the masih/chainex-zstd-compression branch from 05bc578 to 13487e2 Compare January 24, 2025 17:19
@masih
Copy link
Member Author

masih commented Jan 24, 2025

@Kubuxu @Stebalien Can I get a review on this PR please?

Copy link
Member

@Stebalien Stebalien left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

}

func (c *ZSTD[T]) Decode(v []byte, t T) error {
cborEncoded, err := c.decompressor.DecodeAll(v, make([]byte, 0, len(v)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Future change: we should use a buffer pool for these short-lived buffers (https://pkg.go.dev/sync#Pool). If we do that, we can also allocate these buffers with 1MiB capacities and use WithDecodeAllCapLimit to avoid any allocations.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: In review
Development

Successfully merging this pull request may close these issues.

Add compression bomb test Integrate compression into chainexchange topic
3 participants