Skip to content

Commit

Permalink
Genesis export OOM (#58)
Browse files Browse the repository at this point in the history
* debug

* debug

* debug

* debug

* print out largest size for code

* print out addr for large state sizes

* flush wasm contracts  when reaching state kv len threshold reached

* wasm flush every kv pair

* go back to flush after threshold

* validate-genesis using streaming works

* have params on every line in wasm genesis

* export, validate, import work locally

* cleanup

* change name of StreamGenesis to ExportGenesisStream

* remove some prints

* fix
  • Loading branch information
jewei1997 authored Jul 29, 2024
1 parent af0a8e2 commit 42f56f7
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 1 deletion.
1 change: 1 addition & 0 deletions x/wasm/alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ var (
DefaultParams = types.DefaultParams
InitGenesis = keeper.InitGenesis
ExportGenesis = keeper.ExportGenesis
ExportGenesisStream = keeper.ExportGenesisStream
NewMessageHandler = keeper.NewDefaultMessageHandler
DefaultEncoders = keeper.DefaultEncoders
EncodeBankMsg = keeper.EncodeBankMsg
Expand Down
73 changes: 73 additions & 0 deletions x/wasm/keeper/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,76 @@ func ExportGenesis(ctx sdk.Context, keeper *Keeper) *types.GenesisState {

return &genState
}

const GENSIS_STATE_STREAM_BUF_THRESHOLD = 50000

func ExportGenesisStream(ctx sdk.Context, keeper *Keeper) <-chan *types.GenesisState {
ch := make(chan *types.GenesisState)
go func() {
var genState types.GenesisState
genState.Params = keeper.GetParams(ctx)
ch <- &genState

// Needs to be first because there are invariant checks when importing that need sequences info
for _, k := range [][]byte{types.KeyLastCodeID, types.KeyLastInstanceID} {
var genState types.GenesisState
genState.Params = keeper.GetParams(ctx)
genState.Sequences = append(genState.Sequences, types.Sequence{
IDKey: k,
Value: keeper.PeekAutoIncrementID(ctx, k),
})
ch <- &genState
}

keeper.IterateCodeInfos(ctx, func(codeID uint64, info types.CodeInfo) bool {
var genState types.GenesisState
genState.Params = keeper.GetParams(ctx)
bytecode, err := keeper.GetByteCode(ctx, codeID)
if err != nil {
panic(err)
}
genState.Codes = append(genState.Codes, types.Code{
CodeID: codeID,
CodeInfo: info,
CodeBytes: bytecode,
Pinned: keeper.IsPinnedCode(ctx, codeID),
})
ch <- &genState
return false
})

keeper.IterateContractInfo(ctx, func(addr sdk.AccAddress, contract types.ContractInfo) bool {
// redact contract info
contract.Created = nil
var state []types.Model
keeper.IterateContractState(ctx, addr, func(key, value []byte) bool {
state = append(state, types.Model{Key: key, Value: value})
if len(state) > GENSIS_STATE_STREAM_BUF_THRESHOLD {
var genState types.GenesisState
genState.Params = keeper.GetParams(ctx)
genState.Contracts = append(genState.Contracts, types.Contract{
ContractAddress: addr.String(),
ContractInfo: contract,
ContractState: state,
})
ch <- &genState
state = nil
}
return false
})
// flush any remaining state
var genState types.GenesisState
genState.Params = keeper.GetParams(ctx)
genState.Contracts = append(genState.Contracts, types.Contract{
ContractAddress: addr.String(),
ContractInfo: contract,
ContractState: state,
})
ch <- &genState
return false
})

close(ch)
}()
return ch
}
40 changes: 39 additions & 1 deletion x/wasm/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,31 @@ func (b AppModuleBasic) ValidateGenesis(marshaler codec.JSONCodec, config client
return ValidateGenesis(data)
}

func (am AppModuleBasic) ValidateGenesisStream(cdc codec.JSONCodec, config client.TxEncodingConfig, genesisCh <-chan json.RawMessage) error {
var err error
doneCh := make(chan struct{})
genesisStateCh := make(chan GenesisState)
go func() {
err = types.ValidateGenesisStream(genesisStateCh)
doneCh <- struct{}{}
}()
go func() {
defer close(genesisStateCh)
for genesis := range genesisCh {
var data GenesisState
err_ := cdc.UnmarshalJSON(genesis, &data)
if err_ != nil {
err = err_
doneCh <- struct{}{}
return
}
genesisStateCh <- data
}
}()
<-doneCh
return err
}

// RegisterRESTRoutes registers the REST routes for the wasm module.
func (AppModuleBasic) RegisterRESTRoutes(cliCtx client.Context, rtr *mux.Router) {
rest.RegisterRoutes(cliCtx, rtr)
Expand Down Expand Up @@ -170,7 +195,20 @@ func (am AppModule) InitGenesis(ctx sdk.Context, cdc codec.JSONCodec, data json.
// module.
func (am AppModule) ExportGenesis(ctx sdk.Context, cdc codec.JSONCodec) json.RawMessage {
gs := ExportGenesis(ctx, am.keeper)
return cdc.MustMarshalJSON(gs)
marshalled := cdc.MustMarshalJSON(gs)
return marshalled
}

func (am AppModule) ExportGenesisStream(ctx sdk.Context, cdc codec.JSONCodec) <-chan json.RawMessage {
ch := ExportGenesisStream(ctx, am.keeper)
chRaw := make(chan json.RawMessage)
go func() {
for genState := range ch {
chRaw <- cdc.MustMarshalJSON(genState)
}
close(chRaw)
}()
return chRaw
}

// BeginBlock returns the begin blocker for the wasm module.
Expand Down
70 changes: 70 additions & 0 deletions x/wasm/types/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,33 @@ func (s GenesisState) ValidateBasic() error {
return nil
}

func (s GenesisState) ValidateBasicStream(dataCh chan GenesisState) error {
if err := s.Params.ValidateBasic(); err != nil {
return sdkerrors.Wrap(err, "params")
}
for i := range s.Codes {
if err := s.Codes[i].ValidateBasic(); err != nil {
return sdkerrors.Wrapf(err, "code: %d", i)
}
}
for i := range s.Contracts {
if err := s.Contracts[i].ValidateBasic(); err != nil {
return sdkerrors.Wrapf(err, "contract: %d", i)
}
}
for i := range s.Sequences {
if err := s.Sequences[i].ValidateBasic(); err != nil {
return sdkerrors.Wrapf(err, "sequence: %d", i)
}
}
for i := range s.GenMsgs {
if err := s.GenMsgs[i].ValidateBasic(); err != nil {
return sdkerrors.Wrapf(err, "gen message: %d", i)
}
}
return nil
}

func (c Code) ValidateBasic() error {
if c.CodeID == 0 {
return sdkerrors.Wrap(ErrEmpty, "code id")
Expand Down Expand Up @@ -100,6 +127,49 @@ func ValidateGenesis(data GenesisState) error {
return data.ValidateBasic()
}

// ValidateGenesisStream performs basic validation of wasm genesis data over a stream
// of wasm genesis states. It needs to pass the params validation at least one chunk and
// other checks on every chunk of the stream.
func ValidateGenesisStream(genesisStateCh <-chan GenesisState) error {
passedParamsCheck := false
var paramCheckErr error
var otherErr error
for s := range genesisStateCh {
if otherErr != nil {
continue
}
if err := s.Params.ValidateBasic(); err != nil {
paramCheckErr = sdkerrors.Wrap(err, "params")
} else {
passedParamsCheck = true
}
for i := range s.Codes {
if err := s.Codes[i].ValidateBasic(); err != nil {
otherErr = sdkerrors.Wrapf(err, "code: %d", i)
}
}
for i := range s.Contracts {
if err := s.Contracts[i].ValidateBasic(); err != nil {
otherErr = sdkerrors.Wrapf(err, "contract: %d", i)
}
}
for i := range s.Sequences {
if err := s.Sequences[i].ValidateBasic(); err != nil {
otherErr = sdkerrors.Wrapf(err, "sequence: %d", i)
}
}
for i := range s.GenMsgs {
if err := s.GenMsgs[i].ValidateBasic(); err != nil {
otherErr = sdkerrors.Wrapf(err, "gen message: %d", i)
}
}
}
if !passedParamsCheck {
return paramCheckErr
}
return otherErr
}

var _ codectypes.UnpackInterfacesMessage = GenesisState{}

// UnpackInterfaces implements codectypes.UnpackInterfaces
Expand Down

0 comments on commit 42f56f7

Please sign in to comment.