forked from orgrim/pg_back
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsql.go
681 lines (581 loc) · 17.7 KB
/
sql.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
// pg_back
//
// Copyright 2011-2021 Nicolas Thauvin and contributors. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
// are met:
//
// 1. Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE AUTHORS ``AS IS'' AND ANY EXPRESS OR
// IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
// OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
// IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
// INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
// THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
package main
import (
"database/sql"
"errors"
"fmt"
"github.com/jackc/pgtype"
_ "github.com/jackc/pgx/v4/stdlib"
"strings"
"time"
)
type pg struct {
conn *sql.DB
version int
xlogOrWal string
}
func pgGetVersionNum(db *sql.DB) (int, error) {
var version int
query := "select setting from pg_settings where name = 'server_version_num'"
l.Verboseln("executing SQL query:", query)
err := db.QueryRow(query).Scan(&version)
if err != nil {
return 0, fmt.Errorf("could not get PostgreSQL server version: %s", err)
}
return version, nil
}
func dbOpen(conninfo *ConnInfo) (*pg, error) {
connstr := conninfo.String()
l.Verbosef("connecting to PostgreSQL with: \"%s\"", connstr)
db, err := sql.Open("pgx", connstr)
if err != nil {
return nil, fmt.Errorf("could not open database: %s", err)
}
if err := db.Ping(); err != nil {
db.Close()
return nil, fmt.Errorf("could not connect to database: %s", err)
}
newDB := new(pg)
newDB.conn = db
newDB.version, err = pgGetVersionNum(db)
if err != nil {
db.Close()
return nil, err
}
l.Verboseln("server num version is:", newDB.version)
// Keyword xlog has been replaced by wal as of PostgreSQL 10
if newDB.version >= 100000 {
newDB.xlogOrWal = "wal"
} else {
newDB.xlogOrWal = "xlog"
}
return newDB, nil
}
func (db *pg) Close() error {
l.Verboseln("closing connection to PostgreSQL")
return db.conn.Close()
}
func sqlQuoteLiteral(s string) string {
var o string
// Make standard_conforming_strings happy if the input
// contains some backslash
if strings.ContainsAny(s, "\\") {
o = "E"
}
o += "'"
// double single quotes and backslahses
o += strings.ReplaceAll(s, "'", "''")
o = strings.ReplaceAll(o, "\\", "\\\\")
o += "'"
return o
}
func sqlQuoteIdent(s string) string {
// just double the quote quotes, there are no escape for identifiers
return strings.ReplaceAll(s, "\"", "\"\"")
}
func listAllDatabases(db *pg, withTemplates bool) ([]string, error) {
var (
query string
dbname string
)
if withTemplates {
query = "select datname from pg_database where datallowconn;"
} else {
query = "select datname from pg_database where datallowconn and not datistemplate;"
}
dbs := make([]string, 0)
l.Verboseln("executing SQL query:", query)
rows, err := db.conn.Query(query)
if err != nil {
return dbs, fmt.Errorf("could not list databases: %s", err)
}
defer rows.Close()
for rows.Next() {
err := rows.Scan(&dbname)
if err != nil {
continue
}
dbs = append(dbs, dbname)
}
if err := rows.Err(); err != nil {
return dbs, fmt.Errorf("could not retrieve rows: %s", err)
}
return dbs, nil
}
func listDatabases(db *pg, withTemplates bool, excludedDbs []string, includedDbs []string) ([]string, error) {
var (
databases []string
err error
)
// When an explicit list of database is given, allow to select
// templates
if len(includedDbs) > 0 {
databases, err = listAllDatabases(db, true)
if err != nil {
return databases, err
}
realDbs := make([]string, 0, len(includedDbs))
nextidb:
for _, d := range includedDbs {
for _, e := range databases {
if d == e {
realDbs = append(realDbs, d)
continue nextidb
}
}
l.Warnf("database \"%s\" does not exists, excluded", d)
}
databases = realDbs
} else {
databases, err = listAllDatabases(db, withTemplates)
if err != nil {
return databases, err
}
}
// Exclude databases even if they are explicitly included
if len(excludedDbs) > 0 {
filtered := make([]string, 0, len(databases))
nextfdb:
for _, d := range databases {
for _, e := range excludedDbs {
if d == e {
continue nextfdb
}
}
filtered = append(filtered, d)
}
databases = filtered
}
return databases, nil
}
type pgVersionError struct {
s string
}
func (e *pgVersionError) Error() string {
return e.s
}
// pg_dumpacl stuff
func dumpCreateDBAndACL(db *pg, dbname string, force bool) (string, error) {
var s string
if dbname == "" {
return "", fmt.Errorf("empty input dbname")
}
// this query only work from 9.0, where datcollate and datctype were
// added to pg_database
if db.version < 90000 {
return "", &pgVersionError{s: "cluster version is older than 9.0, not dumping ACL"}
}
// this is no longer necessary after 11. Dumping ACL is the
// job of pg_dump so we have to check its version, not the
// server
if pgToolVersion("pg_dump") >= 110000 && !force {
l.Verboseln("no need to dump create database query and database ACL with pg_dump from >=11")
return "", nil
}
l.Infoln("dumping database creation and ACL commands of", dbname)
query := "SELECT coalesce(rolname, (select rolname from pg_authid where oid=(select datdba from pg_database where datname='template0'))), " +
" pg_encoding_to_char(d.encoding), " +
" datcollate, datctype, datistemplate, datacl, datconnlimit, " +
" (SELECT spcname FROM pg_tablespace t WHERE t.oid = d.dattablespace) AS dattablespace " +
"FROM pg_database d" +
" LEFT JOIN pg_authid u ON (datdba = u.oid) " +
"WHERE datallowconn AND datname = $1"
l.Verboseln("executing SQL query:", query)
rows, err := db.conn.Query(query, dbname)
if err != nil {
return "", fmt.Errorf("could not query database information for %s: %s", dbname, err)
}
defer rows.Close()
for rows.Next() {
var (
owner string
encoding string
collate string
ctype string
istemplate bool
acl pgtype.TextArray
connlimit int
tablespace string
)
err := rows.Scan(&owner, &encoding, &collate, &ctype, &istemplate, &acl, &connlimit, &tablespace)
if err != nil {
return "", fmt.Errorf("could not get row: %s", err)
}
if dbname != "template0" {
s += fmt.Sprintf("--\n-- Database creation\n--\n\n")
s += fmt.Sprintf("CREATE DATABASE \"%s\" WITH TEMPLATE = template0 OWNER = \"%s\"", sqlQuoteIdent(dbname), sqlQuoteIdent(owner))
s += fmt.Sprintf(" ENCODING = %s", sqlQuoteLiteral(encoding))
s += fmt.Sprintf(" LC_COLLATE = %s", sqlQuoteLiteral(collate))
s += fmt.Sprintf(" LC_CTYPE = %s", sqlQuoteLiteral(ctype))
if tablespace != "pg_default" {
s += fmt.Sprintf(" TABLESPACE = \"%s\"", sqlQuoteIdent(tablespace))
}
if connlimit != -1 {
s += fmt.Sprintf(" CONNECTION LIMIT = %d", connlimit)
}
s += fmt.Sprintf(";\n\n")
if istemplate {
s += fmt.Sprintf("UPDATE pg_catalog.pg_database SET datistemplate = 't' WHERE datname = %s;\n", sqlQuoteLiteral(dbname))
}
}
// When all privileges are revoked from public, there
// isn't any acl entry in the list showing this. So
// when the list is not empty and no acl are granted
// to public, we have to output a revoke statement for
// public, before any grant.
if len(acl.Elements) > 0 {
var (
t string
revokeAll bool = true
)
s += fmt.Sprintf("--\n-- Database privileges \n--\n\n")
for _, e := range acl.Elements {
if e.Status == pgtype.Null {
continue
}
if strings.HasPrefix(e.String, "=") {
revokeAll = false
}
t += makeACLCommands(e.String, dbname, owner)
}
if revokeAll {
s += fmt.Sprintf("REVOKE CONNECT, TEMPORARY ON DATABASE \"%s\" FROM PUBLIC;\n", sqlQuoteIdent(dbname))
}
s += t
}
}
err = rows.Err()
if err != nil {
return s, fmt.Errorf("could not retrive rows: %s", err)
}
return s, nil
}
func makeACLCommands(aclitem string, dbname string, owner string) string {
var s string
// the aclitem format is "grantee=privs/grantor" where privs
// is a list of letters, one for each privilege followed by *
// when grantee as WITH GRANT OPTION for it
t := strings.Split(aclitem, "=")
grantee := t[0]
if len(t) != 2 {
return ""
}
t = strings.Split(t[1], "/")
privs := t[0]
if len(t) != 2 {
return ""
}
grantor := t[1]
// public role: when the privs differ from the default, issue grants
if grantee == "" {
grantee = "PUBLIC"
if privs != "Tc" {
s += fmt.Sprintf("REVOKE ALL ON DATABASE \"%s\" FROM PUBLIC;\n", sqlQuoteIdent(dbname))
} else {
return s
}
}
// owner: when other roles have been given privileges, all
// privileges are shown for the owner
if grantee == owner {
if privs != "CTc" {
s += fmt.Sprintf("REVOKE ALL ON DATABASE \"%s\" FROM \"%s\";\n", sqlQuoteIdent(dbname), sqlQuoteIdent(grantee))
} else {
return s
}
}
if grantor != owner {
s += fmt.Sprintf("SET SESSION AUTHORIZATION \"%s\";\n", sqlQuoteIdent(grantor))
}
for i, b := range privs {
switch b {
case 'C':
s += fmt.Sprintf("GRANT CREATE ON DATABASE \"%s\" TO \"%s\"", sqlQuoteIdent(dbname), sqlQuoteIdent(grantee))
case 'T':
s += fmt.Sprintf("GRANT TEMPORARY ON DATABASE \"%s\" TO \"%s\"", sqlQuoteIdent(dbname), sqlQuoteIdent(grantee))
case 'c':
s += fmt.Sprintf("GRANT CONNECT ON DATABASE \"%s\" TO \"%s\"", sqlQuoteIdent(dbname), sqlQuoteIdent(grantee))
}
if i+1 < len(privs) {
if privs[i+1] == '*' {
s += fmt.Sprintf(" WITH GRANT OPTION;\n")
} else if privs[i] != '*' {
s += fmt.Sprintf(";\n")
}
} else if privs[i] != '*' {
s += fmt.Sprintf(";\n")
}
}
if grantor != owner {
s += fmt.Sprintf("RESET SESSION AUTHORIZATION;\n")
}
return s
}
func dumpDBConfig(db *pg, dbname string) (string, error) {
var s string
if dbname == "" {
return "", fmt.Errorf("empty input dbname")
}
// this query only work from 9.0, where pg_db_role_setting was introduced
if db.version < 90000 {
return "", &pgVersionError{s: "cluster version is older than 9.0, not dumping database configuration"}
}
// this is no longer necessary after 11. Dumping ACL is the
// job of pg_dump so we have to check its version, not the
// server
if pgToolVersion("pg_dump") >= 110000 {
l.Verboseln("no need to dump database configuration with pg_dump from >=11")
return "", nil
}
l.Infoln("dumping database configuration commands of", dbname)
// dump per database config
query := "SELECT CASE setrole WHEN 0 THEN NULL ELSE pg_get_userbyid(setrole) END, unnest(setconfig) FROM pg_db_role_setting WHERE setdatabase = (SELECT oid FROM pg_database WHERE datname = $1) ORDER BY 1, 2"
l.Verboseln("executing SQL query:", query)
rows, err := db.conn.Query(query, dbname)
if err != nil {
return "", fmt.Errorf("could not query database configuration for %s: %s", dbname, err)
}
defer rows.Close()
for rows.Next() {
var (
role pgtype.Text
keyVal string
)
err := rows.Scan(&role, &keyVal)
if err != nil {
return "", fmt.Errorf("could not get row: %s", err)
}
// split
tokens := strings.Split(keyVal, "=")
// do not quote the value for those two parameters
if tokens[0] != "DateStyle" && tokens[0] != "search_path" {
tokens[1] = fmt.Sprintf("'%s'", tokens[1])
}
if role.Status != pgtype.Null {
s += fmt.Sprintf("ALTER ROLE \"%s\" IN DATABASE \"%s\" SET \"%s\" TO %s;\n", role.String, dbname, tokens[0], tokens[1])
} else {
s += fmt.Sprintf("ALTER DATABASE \"%s\" SET \"%s\" TO %s;\n", dbname, tokens[0], tokens[1])
}
}
err = rows.Err()
if err != nil {
return "", fmt.Errorf("could not retrive rows: %s", err)
}
return s, nil
}
func showSettings(db *pg) (string, error) {
var s, query string
if db.version < 80400 {
return "", &pgVersionError{s: "cluster version is older than 8.4, not dumping configuration"}
}
if db.version >= 90500 {
// use pg_show_all_file_settings() from 9.5+ to get
// the non default values set in the files and
// applied, this avoid duplicates when multiple files
// define parameters.
query = "SELECT name, setting FROM pg_show_all_file_settings() WHERE applied ORDER BY name"
} else {
query = "SELECT name, setting FROM pg_settings WHERE sourcefile IS NOT NULL"
}
l.Verboseln("executing SQL query:", query)
rows, err := db.conn.Query(query)
if err != nil {
return "", fmt.Errorf("could not query instance configuration: %s", err)
}
defer rows.Close()
for rows.Next() {
var (
name string
value string
)
err := rows.Scan(&name, &value)
if err != nil {
l.Errorln(err)
continue
}
if name != "DateStyle" && name != "search_path" {
value = fmt.Sprintf("'%s'", value)
}
s += fmt.Sprintf("%s = %s\n", name, value)
}
err = rows.Err()
if err != nil {
return "", fmt.Errorf("could not retrive rows: %s", err)
}
if db.version >= 90500 {
return s, nil
} else {
// when dumping settings from pg_settings, some
// settings may not be found because their value can
// set a higher levels than configuration files
return s, &pgVersionError{s: "cluster version is older than 9.5, settings from configuration files could be missing if the SET command was used"}
}
}
func extractFileFromSettings(db *pg, name string) (string, error) {
query := "SELECT setting, pg_read_file(setting, 0, (pg_stat_file(setting)).size) FROM pg_settings WHERE name = $1"
l.Verboseln("executing SQL query:", query)
rows, err := db.conn.Query(query, name)
if err != nil {
return "", fmt.Errorf("could not query file contents from settings: %s", err)
}
defer rows.Close()
var result string
for rows.Next() {
var (
path string
contents string
)
err := rows.Scan(&path, &contents)
if err != nil {
l.Errorln(err)
continue
}
result = fmt.Sprintf("# path: %s\n%s\n", path, contents)
}
err = rows.Err()
if err != nil {
return "", fmt.Errorf("could not retrive rows: %s", err)
}
return result, nil
}
type pgReplicaHasLocks struct{}
func (*pgReplicaHasLocks) Error() string {
return "replication not paused because of AccessExclusiveLock"
}
func pauseReplication(db *pg) error {
// If an AccessExclusiveLock is granted when the replay is
// paused, it will remain and pg_dump would be stuck forever
query := fmt.Sprintf("SELECT pg_%s_replay_pause() "+
"WHERE NOT EXISTS (SELECT 1 FROM pg_locks WHERE mode = 'AccessExclusiveLock') "+
"AND pg_is_in_recovery();", db.xlogOrWal)
l.Verboseln("executing SQL query:", query)
rows, err := db.conn.Query(query)
if err != nil {
return fmt.Errorf("could not pause replication: %s", err)
}
defer rows.Close()
// The query returns a single row with one column of type void,
// which is and empty string, on success. It does not return
// any row on failure
void := "failed"
for rows.Next() {
err := rows.Scan(&void)
if err != nil {
return fmt.Errorf("could not get row: %s", err)
}
}
if void == "failed" {
return &pgReplicaHasLocks{}
}
return nil
}
func canPauseReplication(db *pg) (bool, error) {
// hot standby exists from 9.0
if db.version < 90000 {
return false, nil
}
query := fmt.Sprintf("SELECT 1 FROM pg_proc "+
"WHERE proname='pg_%s_replay_pause' AND pg_is_in_recovery()", db.xlogOrWal)
l.Verboseln("executing SQL query:", query)
rows, err := db.conn.Query(query)
if err != nil {
return false, fmt.Errorf("could not check if replication is pausable: %s", err)
}
defer rows.Close()
// The query returns 1 on success, no row on failure
var one int
for rows.Next() {
err := rows.Scan(&one)
if err != nil {
return false, fmt.Errorf("could not get row: %s", err)
}
}
if one == 0 {
return false, nil
}
return true, nil
}
func pauseReplicationWithTimeout(db *pg, timeOut int) error {
if ok, err := canPauseReplication(db); !ok {
return err
}
ticker := time.NewTicker(time.Duration(10) * time.Second)
done := make(chan bool)
stop := make(chan bool)
fail := make(chan error)
l.Infoln("pausing replication")
// We want to retry pausing replication at a defined interval
// but not forever. We cannot put the timeout in the same
// select as the ticker since the ticker would always win
go func() {
var rerr *pgReplicaHasLocks
defer ticker.Stop()
for {
if err := pauseReplication(db); err != nil {
if errors.As(err, &rerr) {
l.Warnln(err)
} else {
fail <- err
return
}
} else {
done <- true
return
}
select {
case <-stop:
return
case <-ticker.C:
break
}
}
}()
// Return as soon as the replication is paused or stop the
// goroutine if we hit the timeout
select {
case <-done:
l.Infoln("replication paused")
case <-time.After(time.Duration(timeOut) * time.Second):
stop <- true
return fmt.Errorf("replication not paused after %v", time.Duration(timeOut)*time.Second)
case err := <-fail:
return fmt.Errorf("%s", err)
}
return nil
}
func resumeReplication(db *pg) error {
if ok, err := canPauseReplication(db); !ok {
return err
}
l.Infoln("resuming replication")
query := fmt.Sprintf("SELECT pg_%s_replay_resume() WHERE pg_is_in_recovery();", db.xlogOrWal)
l.Verboseln("executing SQL query:", query)
_, err := db.conn.Exec(query)
if err != nil {
return fmt.Errorf("could not resume replication: %s", err)
}
return nil
}