diff --git a/cmd/qrank-builder/page_items.go b/cmd/qrank-builder/page_items.go index b620b48..7e10ca3 100644 --- a/cmd/qrank-builder/page_items.go +++ b/cmd/qrank-builder/page_items.go @@ -12,6 +12,7 @@ import ( "fmt" "os" "path/filepath" + "runtime" "slices" "strconv" "strings" @@ -106,27 +107,37 @@ func buildPageItems(ctx context.Context, site *WikiSite, dumps string) (string, } defer file.Close() + // Somewhat surprisingly, the Wikimedia dump of the page_props + // table is mostly sorted by page id, but there's occasional + // exceptions. For example, the file dewiki-20240601-page_props.sql.gz + // contains entries in non-sorted order. Therefore, we need to re-sort + // the page_items ourselves. + items := make(chan extsort.SortType, 10000) + config := extsort.DefaultConfig() + config.ChunkSize = 8 * 1024 * 1024 + config.NumWorkers = runtime.NumCPU() + sorter, sortedChan, errChan := extsort.New(items, PageItemFromBytes, PageItemLess, config) + group, groupCtx := errgroup.WithContext(ctx) - items := make(chan PageItem, 1024) - pageItems := make(chan PageItem, 1024) - pagePropsItems := make(chan PageItem, 1024) group.Go(func() error { - mergePageItems(pageItems, pagePropsItems, items) + defer close(items) + if err := readPageItemsFromPageProps(groupCtx, site, dumps, items); err != nil { + return err + } + if err := readPageItemsFromPage(groupCtx, site, dumps, items); err != nil { + return err + } return nil }) group.Go(func() error { - return readPageItemsFromPageProps(groupCtx, site, dumps, pagePropsItems) - }) - group.Go(func() error { - return readPageItemsFromPage(groupCtx, site, dumps, pageItems) - }) - group.Go(func() error { + sorter.Sort(groupCtx) zstdLevel := zstd.WithEncoderLevel(zstd.SpeedFastest) compressor, err := zstd.NewWriter(file, zstdLevel) if err != nil { return err } - for pi := range items { + for s := range sortedChan { + pi := s.(PageItem) var buf bytes.Buffer buf.WriteString(strconv.FormatUint(pi.Page, 10)) buf.WriteByte('\t') @@ -145,6 +156,9 @@ func buildPageItems(ctx context.Context, site *WikiSite, dumps string) (string, os.Remove(file.Name()) return "", err } + if err := <-errChan; err != nil { + return "", err + } return file.Name(), nil } @@ -152,9 +166,7 @@ func buildPageItems(ctx context.Context, site *WikiSite, dumps string) (string, // ReadPageItemsFromPageProps reads a stream of PageItems (which page // corresponds to what Wikidata item) from a site’s `page_props` table. // The results are streamed in order of increasing page ID. -func readPageItemsFromPageProps(ctx context.Context, site *WikiSite, dumps string, out chan<- PageItem) error { - defer close(out) - +func readPageItemsFromPageProps(ctx context.Context, site *WikiSite, dumps string, out chan<- extsort.SortType) error { ymd := site.LastDumped.Format("20060102") propsFileName := fmt.Sprintf("%s-%s-page_props.sql.gz", site.Key, ymd) propsPath := filepath.Join(dumps, site.Key, ymd, propsFileName) @@ -179,7 +191,6 @@ func readPageItemsFromPageProps(ctx context.Context, site *WikiSite, dumps strin pageCol := slices.Index(columns, "pp_page") nameCol := slices.Index(columns, "pp_propname") valueCol := slices.Index(columns, "pp_value") - lastPage := uint64(0) for { row, err := reader.Read() if err != nil { @@ -189,19 +200,10 @@ func readPageItemsFromPageProps(ctx context.Context, site *WikiSite, dumps strin return nil } - // Because the correctness of the pipeline depends on this, we verify - // that the Wikimedia dumps are indeed in the expected sort order. - // If this ever fails, we need to change our code to sort the items. - // It would be a small change, at the cost of being slower. page, err := strconv.ParseUint(row[pageCol], 10, 64) if err != nil || page == 0 { continue } - if page < lastPage { - return fmt.Errorf("%s should be sorted by page, but %d after %d", - propsFileName, page, lastPage) - } - lastPage = page value := row[valueCol] if row[nameCol] == "wikibase_item" { @@ -220,9 +222,7 @@ func readPageItemsFromPageProps(ctx context.Context, site *WikiSite, dumps strin // ReadPageItemsFromPageProps reads a stream of PageItems (which page // corresponds to what Wikidata item) from a site’s `page` table. // The results are streamed in order of increasing page ID. -func readPageItemsFromPage(ctx context.Context, site *WikiSite, dumps string, out chan<- PageItem) error { - defer close(out) - +func readPageItemsFromPage(ctx context.Context, site *WikiSite, dumps string, out chan<- extsort.SortType) error { // Other than other wiki projects, wikidatawiki.page_props only contains // Wikidata IDs for internal maintenance pages such as templates. To find // the mapping from page-id to wikidata-id for the actually interesting @@ -258,7 +258,6 @@ func readPageItemsFromPage(ctx context.Context, site *WikiSite, dumps string, ou pageCol := slices.Index(columns, "page_id") namespaceCol := slices.Index(columns, "page_namespace") titleCol := slices.Index(columns, "page_title") - lastPage := uint64(0) for { select { case <-ctx.Done(): @@ -274,19 +273,10 @@ func readPageItemsFromPage(ctx context.Context, site *WikiSite, dumps string, ou return nil } - // Because the correctness of the pipeline depends on this, we verify - // that the Wikimedia dumps are indeed in the expected sort order. - // If this ever fails, we need to change our code to sort the items. - // It would be a small change, at the cost of being slower. page, err := strconv.ParseUint(row[pageCol], 10, 64) if err != nil || page == 0 { continue } - if page < lastPage { - return fmt.Errorf("%s should be sorted by page, but %d after %d", - fileName, page, lastPage) - } - lastPage = page if row[namespaceCol] != "0" { continue @@ -301,27 +291,3 @@ func readPageItemsFromPage(ctx context.Context, site *WikiSite, dumps string, ou } } } - -// MergePageItems merges two sorted PageItem channels. -func mergePageItems(a <-chan PageItem, b <-chan PageItem, out chan<- PageItem) { - defer close(out) - valA, okA := <-a - valB, okB := <-b - for okA && okB { - if PageItemLess(valA, valB) { - out <- valA - valA, okA = <-a - } else { - out <- valB - valB, okB = <-b - } - } - for okA { - out <- valA - valA, okA = <-a - } - for okB { - out <- valB - valB, okB = <-b - } -}