diff --git a/include/mysql/components/services/clone_protocol_service.h b/include/mysql/components/services/clone_protocol_service.h index 95e6354e384c..e33a37080d9d 100644 --- a/include/mysql/components/services/clone_protocol_service.h +++ b/include/mysql/components/services/clone_protocol_service.h @@ -61,6 +61,8 @@ struct mysql_clone_ssl_context { /** Enable network compression. */ bool m_enable_compression; + const char *m_compression_algorithm; + uint m_compression_level; NET_SERVER *m_server_extn; }; diff --git a/mysql-test/suite/rocksdb_clone/r/compression_config.result b/mysql-test/suite/rocksdb_clone/r/compression_config.result new file mode 100644 index 000000000000..1f498780c503 --- /dev/null +++ b/mysql-test/suite/rocksdb_clone/r/compression_config.result @@ -0,0 +1,23 @@ +INSTALL PLUGIN clone SONAME 'CLONE_PLUGIN'; +SET GLOBAL clone_enable_compression = ON; +SET GLOBAL clone_compression_algorithm = ZSTD; +SET GLOBAL clone_zstd_compression_level = 8; +SET GLOBAL clone_autotune_concurrency = OFF; +SET GLOBAL clone_max_concurrency = 8; +SET GLOBAL clone_valid_donor_list = 'HOST:PORT'; +CLONE INSTANCE FROM USER@HOST:PORT IDENTIFIED BY '' DATA DIRECTORY = 'CLONE_DATADIR'; +select ID, STATE, ERROR_NO from performance_schema.clone_status; +ID STATE ERROR_NO +1 Completed 0 +select ID, STAGE, STATE from performance_schema.clone_progress; +ID STAGE STATE +1 DROP DATA Completed +1 FILE COPY Completed +1 PAGE COPY Completed +1 SST COPY Completed +1 REDO COPY Completed +1 FILE SYNC Completed +1 RESTART Not Started +1 RECOVERY Not Started +# restart +UNINSTALL PLUGIN clone; diff --git a/mysql-test/suite/rocksdb_clone/t/compression_config.test b/mysql-test/suite/rocksdb_clone/t/compression_config.test new file mode 100644 index 000000000000..9d50d063863c --- /dev/null +++ b/mysql-test/suite/rocksdb_clone/t/compression_config.test @@ -0,0 +1,28 @@ +--source include/have_rocksdb.inc + +--let $HOST = 127.0.0.1 +--let $PORT =`select @@port` +--let $USER = root +--let remote_clone = 1 + +--source ../../clone/include/clone_connection_begin.inc +--let $CLONE_DATADIR = $MYSQL_TMP_DIR/data_new + +# Install Clone Plugin +--replace_result $CLONE_PLUGIN CLONE_PLUGIN +--eval INSTALL PLUGIN clone SONAME '$CLONE_PLUGIN' + +--connection clone_conn_1 +SET GLOBAL clone_enable_compression = ON; +SET GLOBAL clone_compression_algorithm = ZSTD; +SET GLOBAL clone_zstd_compression_level = 8; +--source ../../clone/include/clone_command.inc + +--force-rmdir $CLONE_DATADIR + +--let restart_parameters= +--source include/restart_mysqld.inc + +# Clean up +UNINSTALL PLUGIN clone; +--source ../../clone/include/clone_connection_end.inc diff --git a/plugin/clone/include/clone.h b/plugin/clone/include/clone.h index f831b20e5871..6cd760ce0de8 100644 --- a/plugin/clone/include/clone.h +++ b/plugin/clone/include/clone.h @@ -90,6 +90,10 @@ extern uint clone_max_io_bandwidth; /** Clone system variable: If network compression is enabled */ extern bool clone_enable_compression; +extern ulong clone_compression_algorithm; + +extern uint clone_zstd_compression_level; + /** Clone system variable: SSL private key */ extern char *clone_client_ssl_private_key; @@ -111,6 +115,9 @@ const uint CLONE_MIN_BLOCK = 1024 * 1024; /** Minimum network packet. Safe margin for meta information */ const uint CLONE_MIN_NET_BLOCK = 2 * CLONE_MIN_BLOCK; +/** Clone compression algorithms to use */ +extern const char *clone_compression_algorithms[3]; + /* Namespace for all clone data types */ namespace myclone { diff --git a/plugin/clone/src/clone_client.cc b/plugin/clone/src/clone_client.cc index 20f8454c78f4..52bd8002bda7 100644 --- a/plugin/clone/src/clone_client.cc +++ b/plugin/clone/src/clone_client.cc @@ -27,6 +27,7 @@ Clone Plugin: Client implementation */ #include +#include "plugin/clone/include/clone.h" #include "plugin/clone/include/clone_client.h" #include "plugin/clone/include/clone_os.h" @@ -896,6 +897,13 @@ int Client::connect_remote(bool is_restart, bool use_aux) { mysql_clone_ssl_context ssl_context; ssl_context.m_enable_compression = clone_enable_compression; + + if (ssl_context.m_enable_compression) { + ssl_context.m_compression_algorithm = + clone_compression_algorithms[clone_compression_algorithm]; + ssl_context.m_compression_level = clone_zstd_compression_level; + } + ssl_context.m_server_extn = ssl_context.m_enable_compression ? &m_conn_server_extn : nullptr; ssl_context.m_ssl_mode = m_share->m_ssl_mode; diff --git a/plugin/clone/src/clone_plugin.cc b/plugin/clone/src/clone_plugin.cc index fdbf543cb840..108972a90179 100644 --- a/plugin/clone/src/clone_plugin.cc +++ b/plugin/clone/src/clone_plugin.cc @@ -70,6 +70,10 @@ uint clone_max_io_bandwidth; /** Clone system variable: If network compression is enabled */ bool clone_enable_compression; +ulong clone_compression_algorithm; + +uint clone_zstd_compression_level; + /** Clone system variable: valid list of donor addresses. */ static char *clone_valid_donor_list; @@ -636,6 +640,27 @@ static MYSQL_SYSVAR_BOOL(enable_compression, clone_enable_compression, "If compression is done at network", nullptr, nullptr, false); /* Disable compression by default */ +enum enum_clone_compression_algorithm { ZLIB = 0, ZSTD }; +const char *clone_compression_algorithms[] = {"ZLIB", "ZSTD", NullS}; + +static TYPELIB clone_compression_algorithms_typelib = { + array_elements(clone_compression_algorithms) - 1, + "clone_compression_algorithms_typelib", + clone_compression_algorithms, nullptr}; + +static MYSQL_SYSVAR_ENUM(compression_algorithm, clone_compression_algorithm, + PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC, + "compression algorithm used in clone", nullptr, + nullptr, enum_clone_compression_algorithm::ZSTD, + &clone_compression_algorithms_typelib); + +static MYSQL_SYSVAR_UINT(zstd_compression_level, clone_zstd_compression_level, + PLUGIN_VAR_NOCMDARG, "zstd compression level", nullptr, + nullptr, 3, /* Default */ + 1, /* Minimum */ + 10, /* Maximum */ + 1); + /** List of valid donor addresses allowed to clone from. */ static MYSQL_SYSVAR_STR(valid_donor_list, clone_valid_donor_list, PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC, @@ -705,6 +730,8 @@ static SYS_VAR *clone_system_variables[] = { MYSQL_SYSVAR(ssl_cert), MYSQL_SYSVAR(ssl_ca), MYSQL_SYSVAR(donor_timeout_after_network_failure), + MYSQL_SYSVAR(compression_algorithm), + MYSQL_SYSVAR(zstd_compression_level), nullptr}; /** Declare clone plugin */ diff --git a/sql/server_component/clone_protocol_service.cc b/sql/server_component/clone_protocol_service.cc index b283d5fd8359..05c1eef5db7a 100644 --- a/sql/server_component/clone_protocol_service.cc +++ b/sql/server_component/clone_protocol_service.cc @@ -394,6 +394,12 @@ DEFINE_METHOD(MYSQL *, mysql_clone_connect, /* Enable compression. */ if (ssl_ctx->m_enable_compression) { mysql_options(mysql, MYSQL_OPT_COMPRESS, nullptr); + if (ssl_ctx->m_compression_algorithm) { + mysql_options(mysql, MYSQL_OPT_COMPRESSION_ALGORITHMS, + ssl_ctx->m_compression_algorithm); + mysql_options(mysql, MYSQL_OPT_ZSTD_COMPRESSION_LEVEL, + &ssl_ctx->m_compression_level); + } mysql_extension_set_server_extn(mysql, ssl_ctx->m_server_extn); } @@ -539,6 +545,13 @@ DEFINE_METHOD(int, mysql_clone_get_response, *net_length = 0; *length = my_net_read(net); + /** The ZSTD object has been updated to include the newly created decompressor + * context. Include the change in the net extension so that the resource can + * be released when de-init compress context. */ + if (net->compress && server_extn.compress_ctx.algorithm == MYSQL_ZSTD) { + static_cast(saved_extn)->compress_ctx.u.zstd_ctx = + static_cast(net->extension)->compress_ctx.u.zstd_ctx; + } net->extension = saved_extn; server_extn.compress_ctx.algorithm = MYSQL_UNCOMPRESSED;