Skip to content

Commit

Permalink
Extract size of every page in wikitext format
Browse files Browse the repository at this point in the history
  • Loading branch information
brawer committed May 11, 2024
1 parent 30c0b27 commit 781d26b
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 31 deletions.
110 changes: 86 additions & 24 deletions cmd/qrank-builder/pageentities.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"runtime"
"slices"
"sort"
"strconv"
"strings"

"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -120,22 +121,17 @@ func buildSitePageEntities(site WikiSite, ctx context.Context, dumps string, s3
})
group.Go(func() error {
sorter.Sort(groupCtx)
merger := NewPageSignalMerger(writer)
for {
select {
case <-groupCtx.Done():
return groupCtx.Err()
case line, more := <-outChan:
if !more {
return nil
return merger.Close()
}
var buf bytes.Buffer
if _, err := buf.WriteString(line); err != nil {
return err
}
if err := buf.WriteByte('\n'); err != nil {
return err
}
if _, err := buf.WriteTo(writer); err != nil {
err := merger.Process(line)
if err != nil {
return err
}
}
Expand All @@ -147,11 +143,6 @@ func buildSitePageEntities(site WikiSite, ctx context.Context, dumps string, s3
if err := <-errChan; err != nil {
return err
}

if err := writer.Close(); err != nil {
return err
}

if err := outFile.Close(); err != nil {
return err
}
Expand Down Expand Up @@ -224,14 +215,7 @@ var wikidataTitleRe = regexp.MustCompile(`^Q\d+$`)
// ProcessPageTable processes a dump of the `page` table for a Wikimedia site.
// Called by function buildSitePageEntities().
func processPageTable(ctx context.Context, dumps string, site *WikiSite, out chan<- string) error {
// For now, we only need to process the page table of wikidatawiki.
// This whis will change once we collect page sizes.
// https://github.com/brawer/wikidata-qrank/issues/38
isWikidata := site.Key == "wikidatawiki"
if !isWikidata {
return nil
}

ymd := site.LastDumped.Format("20060102")
propsFileName := fmt.Sprintf("%s-%s-page.sql.gz", site.Key, ymd)
propsPath := filepath.Join(dumps, site.Key, ymd, propsFileName)
Expand All @@ -256,7 +240,8 @@ func processPageTable(ctx context.Context, dumps string, site *WikiSite, out cha
pageCol := slices.Index(columns, "page_id")
namespaceCol := slices.Index(columns, "page_namespace")
titleCol := slices.Index(columns, "page_title")
// lenCol := slices.Index(columns, "page_len")
contentModelCol := slices.Index(columns, "page_content_model")
lenCol := slices.Index(columns, "page_len")

for {
select {
Expand Down Expand Up @@ -285,9 +270,11 @@ func processPageTable(ctx context.Context, dumps string, site *WikiSite, out cha
}
}

// TODO: Collect page sizes.
// Collect page sizes.
// https://github.com/brawer/wikidata-qrank/issues/38
// out <- fmt.Sprintf("%s,M:%s", row[pageCol], row[lenCol])
if row[contentModelCol] == "wikitext" {
out <- fmt.Sprintf("%s,s=%s", row[pageCol], row[lenCol])
}
}
}

Expand Down Expand Up @@ -426,3 +413,78 @@ func (s *pageEntitiesScanner) Text() string {
func (s *pageEntitiesScanner) Err() error {
return s.err
}

// PageSignalMerger aggregates per-page signals from different sources
// into a single output line. Input and output is keyed by page id.
type pageSignalMerger struct {
writer io.WriteCloser
page string
entity string
pageSize int64
}

func NewPageSignalMerger(w io.WriteCloser) *pageSignalMerger {
return &pageSignalMerger{writer: w}
}

// Process handles one line of input.
// Input must be grouped by page (such as by sorting lines).
// Recognized line formats:
//
// "200,Q72": wikipage #200 is for Wikidata entity Q72
// "200,s=830167": wikipage #200 is 830167 bytes in size
func (m *pageSignalMerger) Process(line string) error {
pos := strings.IndexByte(line, ',')
page := line[0:pos]
if page != m.page {
if err := m.write(); err != nil {
return err
}
m.page = page
}

switch line[pos+1] {
case 'Q':
m.entity = line[pos+1 : len(line)]
case 's':
if line[pos+2] == '=' {
if n, err := strconv.ParseInt(line[pos+3:len(line)], 10, 64); err == nil {
m.pageSize += n
}
}
}

return nil
}

func (m *pageSignalMerger) Close() error {
if err := m.write(); err != nil {
return err
}

if err := m.writer.Close(); err != nil {
return err
}

return nil
}

func (m *pageSignalMerger) write() error {
var err error
if m.page != "" && m.entity != "" {
var buf bytes.Buffer
buf.WriteString(m.page)
buf.WriteByte(',')
buf.WriteString(m.entity)
buf.WriteByte(',')
buf.WriteString(strconv.FormatInt(m.pageSize, 10))
buf.WriteByte('\n')
_, err = m.writer.Write(buf.Bytes())
}

m.page = ""
m.entity = ""
m.pageSize = 0

return err
}
67 changes: 60 additions & 7 deletions cmd/qrank-builder/pageentities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"bytes"
"context"
"fmt"
"io"
"log"
"path/filepath"
"reflect"
Expand Down Expand Up @@ -51,9 +52,12 @@ func TestBuildPageEntities(t *testing.T) {
if err != err {
t.Fatal(err)
}
got = strings.Join(gotLines, " | ")
want = "1,Q5296 | 3824,Q662541 | 799,Q72"
if got != want {
wantLines := []string{
"1,Q5296,2500",
"3824,Q662541,4973",
"799,Q72,3142",
}
if !slices.Equal(gotLines, wantLines) {
t.Errorf("got %v, want %v", got, want)
}

Expand All @@ -67,10 +71,15 @@ func TestBuildPageEntities(t *testing.T) {
if err != err {
t.Fatal(err)
}
got = strings.Join(gotLines, " | ")
want = "1,Q107661323 | 19441465,Q5296 | 200,Q72 | 5411171,Q5649951 | 623646,Q662541"
if got != want {
t.Errorf("got %v, want %v", got, want)
wantLines = []string{
"1,Q107661323,3470",
"19441465,Q5296,372",
"200,Q72,0",
"5411171,Q5649951,0",
"623646,Q662541,0",
}
if !slices.Equal(gotLines, wantLines) {
t.Errorf("got %v, want %v", gotLines, wantLines)
}

// Verify that obsolete files have been cleaned up.
Expand Down Expand Up @@ -156,3 +165,47 @@ func storeFakePageEntities(id string, content string, s3 *FakeS3, t *testing.T)
path := fmt.Sprintf("page_entities/%s-page_entities.zst", id)
s3.data[path] = buf.Bytes()
}

func TestPageSignalMerger(t *testing.T) {
var buf strings.Builder
m := NewPageSignalMerger(NopWriteCloser(&buf))
for _, line := range []string{
"11,s=1111111",
"22,Q72",
"22,s=830167",
"333,Q3",
} {
if err := m.Process(line); err != nil {
t.Error(err)
}
}
if err := m.Close(); err != nil {
t.Error(err)
}
got := strings.Split(strings.TrimSuffix(buf.String(), "\n"), "\n")
want := []string{
"22,Q72,830167",
"333,Q3,0",
}
if !slices.Equal(got, want) {
t.Errorf("got %v, want %v", got, want)
}
}

// NopWriteCloser returns a WriteCloser with a no-op Close method wrapping the
// provided Writer w. Like io.ReadCloser but for writing.
func NopWriteCloser(w io.Writer) io.WriteCloser {
return &nopWriteCloser{w}
}

type nopWriteCloser struct {
writer io.Writer
}

func (n *nopWriteCloser) Close() error {
return nil
}

func (n *nopWriteCloser) Write(p []byte) (int, error) {
return n.writer.Write(p)
}

0 comments on commit 781d26b

Please sign in to comment.