diff --git a/cmd/otelbench/dump.go b/cmd/otelbench/dump.go index 4b247cef..1ad37f7e 100644 --- a/cmd/otelbench/dump.go +++ b/cmd/otelbench/dump.go @@ -22,17 +22,33 @@ func newDumpCommand() *cobra.Command { } rootCmd.AddCommand( newDumpCreateCommand(), + newDumpRestoreCommand(), ) return rootCmd } +func dumpTables() []string { + return []string{ + "migration", + "metrics_exemplars", + "metrics_exp_histograms", + "metrics_labels", + "metrics_points", + "traces_spans", + "traces_tags", + "logs_attrs", + "logs", + } +} + func newDumpCreateCommand() *cobra.Command { var arg struct { LimitTime time.Duration LimitCount int - Output string - Database string + Output string + Database string + Compression string KubernetesNamespace string KubernetesService string @@ -113,17 +129,7 @@ func newDumpCreateCommand() *cobra.Command { } } fmt.Println("Clickhouse connection is ready") - tables := []string{ - "migration", - "metrics_exemplars", - "metrics_exp_histograms", - "metrics_labels", - "metrics_points", - "traces_spans", - "traces_tags", - "logs_attrs", - "logs", - } + tables := dumpTables() var files []os.FileInfo for _, table := range tables { query := fmt.Sprintf("SELECT * FROM %s.%s", arg.Database, table) @@ -143,8 +149,8 @@ func newDumpCreateCommand() *cobra.Command { } // SELECT * FROM faster.logs INTO OUTFILE '/tmp/dump.bin' FORMAT Native; - outFile := filepath.Join(arg.Output, fmt.Sprintf("%s.bin.lz4", table)) - query += fmt.Sprintf(" INTO OUTFILE '%s' TRUNCATE COMPRESSION 'lz4' FORMAT Native", outFile) + outFile := filepath.Join(arg.Output, fmt.Sprintf("%s.bin.%s", table, arg.Compression)) + query += fmt.Sprintf(" INTO OUTFILE '%s' TRUNCATE COMPRESSION '%s' FORMAT Native", outFile, arg.Compression) args := []string{ "-h", "localhost", @@ -195,6 +201,95 @@ func newDumpCreateCommand() *cobra.Command { f.IntVar(&arg.LocalPort, "port", 9000, "Local port") f.DurationVar(&arg.LimitTime, "duration", 0, "Limit oldest data with delta from now") f.IntVar(&arg.LimitCount, "limit", 0, "Limit oldest data with count") + f.StringVar(&arg.Compression, "compression", "zstd", "Compression algorithm") + + return rootCmd +} + +func newDumpRestoreCommand() *cobra.Command { + var arg struct { + Input string + Database string + Host string + Port int + Truncate bool + Compression string + } + rootCmd := &cobra.Command{ + Use: "restore", + Short: "Restore a dump", + RunE: func(cobraCommand *cobra.Command, _ []string) error { + ctx := cobraCommand.Context() + client, err := ch.Dial(ctx, ch.Options{ + Address: fmt.Sprintf("%s:%d", arg.Host, arg.Port), + Database: arg.Database, + }) + if err != nil { + return errors.Wrap(err, "dial") + } + + truncate := func(ctx context.Context, table string) error { + ctx, cancel := context.WithTimeout(ctx, time.Minute) + defer cancel() + + query := fmt.Sprintf("TRUNCATE TABLE %s.%s", arg.Database, table) + if err := client.Do(ctx, ch.Query{Body: query}); err != nil { + return errors.Wrap(err, "truncate") + } + + return nil + } + + tables := dumpTables() + for _, table := range tables { + file := filepath.Join(arg.Input, fmt.Sprintf("%s.bin.%s", table, arg.Compression)) + if _, err := os.Stat(file); err != nil { + if os.IsNotExist(err) { + fmt.Printf("Table %s not found\n", table) + continue + } + return errors.Wrap(err, "stat") + } + if arg.Truncate { + if err := truncate(ctx, table); err != nil { + return errors.Wrapf(err, "truncate %s", table) + } + } + + if err := func() error { + q := fmt.Sprintf("INSERT INTO %s.%s FROM INFILE '%s' COMPRESSION '%s' FORMAT Native", + arg.Database, table, file, arg.Compression, + ) + args := []string{ + "-h", arg.Host, + "-d", arg.Database, + "--progress", + "-q", q, + } + cmd := exec.CommandContext(ctx, "clickhouse-client", args...) + cmd.Stderr = os.Stderr + cmd.Stdout = os.Stdout + + fmt.Println(">", table) + if err := cmd.Run(); err != nil { + return errors.Wrapf(err, "restore %s", table) + } + + return nil + }(); err != nil { + return errors.Wrapf(err, "restore %s", file) + } + } + return nil + }, + } + + f := rootCmd.Flags() + f.StringVarP(&arg.Input, "input", "i", "", "Input directory") + f.StringVarP(&arg.Database, "database", "d", "default", "Database name") + f.IntVar(&arg.Port, "port", 9000, "Clickhouse port") + f.StringVar(&arg.Host, "host", "localhost", "Clickhouse host") + f.BoolVar(&arg.Truncate, "truncate", false, "Truncate tables before restore") return rootCmd }