Skip to content

Commit

Permalink
Do not assume sort order of page_props table
Browse files Browse the repository at this point in the history
  • Loading branch information
brawer committed Jun 21, 2024
1 parent f37b7d6 commit 4d67f6b
Showing 1 changed file with 27 additions and 61 deletions.
88 changes: 27 additions & 61 deletions cmd/qrank-builder/page_items.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"fmt"
"os"
"path/filepath"
"runtime"
"slices"
"strconv"
"strings"
Expand Down Expand Up @@ -106,27 +107,37 @@ func buildPageItems(ctx context.Context, site *WikiSite, dumps string) (string,
}
defer file.Close()

// Somewhat surprisingly, the Wikimedia dump of the page_props
// table is mostly sorted by page id, but there's occasional
// exceptions. For example, the file dewiki-20240601-page_props.sql.gz
// contains entries in non-sorted order. Therefore, we need to re-sort
// the page_items ourselves.
items := make(chan extsort.SortType, 10000)
config := extsort.DefaultConfig()
config.ChunkSize = 8 * 1024 * 1024
config.NumWorkers = runtime.NumCPU()
sorter, sortedChan, errChan := extsort.New(items, PageItemFromBytes, PageItemLess, config)

group, groupCtx := errgroup.WithContext(ctx)
items := make(chan PageItem, 1024)
pageItems := make(chan PageItem, 1024)
pagePropsItems := make(chan PageItem, 1024)
group.Go(func() error {
mergePageItems(pageItems, pagePropsItems, items)
defer close(items)
if err := readPageItemsFromPageProps(groupCtx, site, dumps, items); err != nil {
return err
}
if err := readPageItemsFromPage(groupCtx, site, dumps, items); err != nil {
return err
}
return nil
})
group.Go(func() error {
return readPageItemsFromPageProps(groupCtx, site, dumps, pagePropsItems)
})
group.Go(func() error {
return readPageItemsFromPage(groupCtx, site, dumps, pageItems)
})
group.Go(func() error {
sorter.Sort(groupCtx)
zstdLevel := zstd.WithEncoderLevel(zstd.SpeedFastest)
compressor, err := zstd.NewWriter(file, zstdLevel)
if err != nil {
return err
}
for pi := range items {
for s := range sortedChan {
pi := s.(PageItem)
var buf bytes.Buffer
buf.WriteString(strconv.FormatUint(pi.Page, 10))
buf.WriteByte('\t')
Expand All @@ -145,16 +156,17 @@ func buildPageItems(ctx context.Context, site *WikiSite, dumps string) (string,
os.Remove(file.Name())
return "", err
}
if err := <-errChan; err != nil {
return "", err
}

return file.Name(), nil
}

// ReadPageItemsFromPageProps reads a stream of PageItems (which page
// corresponds to what Wikidata item) from a site’s `page_props` table.
// The results are streamed in order of increasing page ID.
func readPageItemsFromPageProps(ctx context.Context, site *WikiSite, dumps string, out chan<- PageItem) error {
defer close(out)

func readPageItemsFromPageProps(ctx context.Context, site *WikiSite, dumps string, out chan<- extsort.SortType) error {
ymd := site.LastDumped.Format("20060102")
propsFileName := fmt.Sprintf("%s-%s-page_props.sql.gz", site.Key, ymd)
propsPath := filepath.Join(dumps, site.Key, ymd, propsFileName)
Expand All @@ -179,7 +191,6 @@ func readPageItemsFromPageProps(ctx context.Context, site *WikiSite, dumps strin
pageCol := slices.Index(columns, "pp_page")
nameCol := slices.Index(columns, "pp_propname")
valueCol := slices.Index(columns, "pp_value")
lastPage := uint64(0)
for {
row, err := reader.Read()
if err != nil {
Expand All @@ -189,19 +200,10 @@ func readPageItemsFromPageProps(ctx context.Context, site *WikiSite, dumps strin
return nil
}

// Because the correctness of the pipeline depends on this, we verify
// that the Wikimedia dumps are indeed in the expected sort order.
// If this ever fails, we need to change our code to sort the items.
// It would be a small change, at the cost of being slower.
page, err := strconv.ParseUint(row[pageCol], 10, 64)
if err != nil || page == 0 {
continue
}
if page < lastPage {
return fmt.Errorf("%s should be sorted by page, but %d after %d",
propsFileName, page, lastPage)
}
lastPage = page

value := row[valueCol]
if row[nameCol] == "wikibase_item" {
Expand All @@ -220,9 +222,7 @@ func readPageItemsFromPageProps(ctx context.Context, site *WikiSite, dumps strin
// ReadPageItemsFromPageProps reads a stream of PageItems (which page
// corresponds to what Wikidata item) from a site’s `page` table.
// The results are streamed in order of increasing page ID.
func readPageItemsFromPage(ctx context.Context, site *WikiSite, dumps string, out chan<- PageItem) error {
defer close(out)

func readPageItemsFromPage(ctx context.Context, site *WikiSite, dumps string, out chan<- extsort.SortType) error {
// Other than other wiki projects, wikidatawiki.page_props only contains
// Wikidata IDs for internal maintenance pages such as templates. To find
// the mapping from page-id to wikidata-id for the actually interesting
Expand Down Expand Up @@ -258,7 +258,6 @@ func readPageItemsFromPage(ctx context.Context, site *WikiSite, dumps string, ou
pageCol := slices.Index(columns, "page_id")
namespaceCol := slices.Index(columns, "page_namespace")
titleCol := slices.Index(columns, "page_title")
lastPage := uint64(0)
for {
select {
case <-ctx.Done():
Expand All @@ -274,19 +273,10 @@ func readPageItemsFromPage(ctx context.Context, site *WikiSite, dumps string, ou
return nil
}

// Because the correctness of the pipeline depends on this, we verify
// that the Wikimedia dumps are indeed in the expected sort order.
// If this ever fails, we need to change our code to sort the items.
// It would be a small change, at the cost of being slower.
page, err := strconv.ParseUint(row[pageCol], 10, 64)
if err != nil || page == 0 {
continue
}
if page < lastPage {
return fmt.Errorf("%s should be sorted by page, but %d after %d",
fileName, page, lastPage)
}
lastPage = page

if row[namespaceCol] != "0" {
continue
Expand All @@ -301,27 +291,3 @@ func readPageItemsFromPage(ctx context.Context, site *WikiSite, dumps string, ou
}
}
}

// MergePageItems merges two sorted PageItem channels.
func mergePageItems(a <-chan PageItem, b <-chan PageItem, out chan<- PageItem) {
defer close(out)
valA, okA := <-a
valB, okB := <-b
for okA && okB {
if PageItemLess(valA, valB) {
out <- valA
valA, okA = <-a
} else {
out <- valB
valB, okB = <-b
}
}
for okA {
out <- valA
valA, okA = <-a
}
for okB {
out <- valB
valB, okB = <-b
}
}

0 comments on commit 4d67f6b

Please sign in to comment.