Skip to content

Commit

Permalink
Merge pull request #1 from datnguye/features/add-tables-filter
Browse files Browse the repository at this point in the history
Features/add tables filter
  • Loading branch information
datnguye authored May 31, 2021
2 parents 163c1dd + 72b9aca commit bd980e2
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 12 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,8 @@ Install dependencies:
pip install -r requirements.txt
```


## TODO:
* Adding schema list
* Support AZ Synapse connector

3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pyyaml==5.4.1
certifi==2020.12.5
snowflake-connector-python==2.4.3
snowflake-connector-python==2.4.3[pandas]
snowflake-connector-python[pandas]
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

setup(
name='snowbim',
version='1.0.9',
version='1.0.10',
author='datnguye',
author_email='[email protected]',
packages=find_packages(),
Expand Down
8 changes: 6 additions & 2 deletions snowbim/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

def main():
parser = argparse.ArgumentParser(prog='snowbim')
parser.add_argument('-v','--version', action='version', version='%(prog)s 1.0.9')
parser.add_argument('-v','--version', action='version', version='%(prog)s 1.0.10')

parser.add_argument('--bim', help='File path to an input .bim file', type=str, default='')
parser.add_argument('--out', help='File path to an out .bim file. Optional', type=str, default='')
Expand All @@ -12,6 +12,8 @@ def main():
parser.add_argument('--target', help='dbt profile target. Optional, default: dev', type=str, default='dev')
parser.add_argument('--db', help='Snowflake database. Optional, default: DEMO_DB', type=str, default='DEMO_DB')
parser.add_argument('--schema', help='Snowflake database\'s schema. Optional, default: PUBLIC', type=str, default='PUBLIC')
parser.add_argument('--tables_inc', help='Snowflake tables inclusive. Items are in string splitted by comma. Optional, default: (all)', type=str, default='')
parser.add_argument('--tables_exc', help='Snowflake tables exclusive. Items are in string splitted by comma. Optional, default: (none)', type=str, default='')

args = parser.parse_args()
snowbim.upgrade_schema( bim_path = args.bim,
Expand All @@ -20,7 +22,9 @@ def main():
profile = args.profile,
target = args.target,
db = args.db,
schema = args.schema)
schema = args.schema,
tables = [x.strip() for x in args.tables_inc.split(',')] if args.tables_inc else [],
exclude_tables = [x.strip() for x in args.tables_exc.split(',')] if args.tables_exc else [])

if __name__ == '__main__':
main()
37 changes: 33 additions & 4 deletions snowbim/engines/snowengine.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def connect(profile_dir:str=None, profile:str=None, target:str=None, db:str=None
return (0, conn, None)


def compare_schema(snowflake_conn, bim_path:str=None, mode:str='directQuery', table_excludes:list=[]):
def compare_schema(snowflake_conn, bim_path:str=None, mode:str='directQuery', tables:list=[], exclude_tables:list=[]):
'''
Get the changes of snowflake database schema
Currently support only for
Expand All @@ -74,12 +74,41 @@ def compare_schema(snowflake_conn, bim_path:str=None, mode:str='directQuery', ta
snowflake_schema = []
cur = snowflake_conn.cursor()

cur.execute(f'SELECT * FROM "INFORMATION_SCHEMA"."TABLES" WHERE "TABLE_SCHEMA" = \'{snowflake_conn.schema}\' ORDER BY "TABLE_SCHEMA", "TABLE_NAME"')
filterred_tables_string = "1 = 1"
filterred_exctables_string = "1 = 1"
if tables and len(tables) > 0:
filterred_tables_string = ','.join(tables)
filterred_tables_string = filterred_tables_string.replace(',',"','")
filterred_tables_string = f"\"TABLE_NAME\" IN ('{filterred_tables_string}')"
if exclude_tables and len(exclude_tables) > 0:
filterred_exctables_string = ','.join(exclude_tables)
filterred_exctables_string = filterred_exctables_string.replace(',',"','")
filterred_exctables_string = f"\"TABLE_NAME\" NOT IN ('{filterred_exctables_string}')"

cur.execute(f'''
SELECT "TABLE_NAME",
"TABLE_TYPE"
FROM "INFORMATION_SCHEMA"."TABLES"
WHERE "TABLE_SCHEMA" = \'{snowflake_conn.schema}\'
AND {filterred_tables_string}
AND {filterred_exctables_string}
ORDER BY "TABLE_SCHEMA", "TABLE_NAME"
''')
df_tables = cur.fetch_pandas_all()

cur.execute(f'SELECT * FROM "INFORMATION_SCHEMA"."COLUMNS" WHERE "TABLE_SCHEMA" = \'{snowflake_conn.schema}\' ORDER BY "TABLE_SCHEMA", "TABLE_NAME", "COLUMN_NAME"')
cur.execute(f'''
SELECT "TABLE_SCHEMA",
"TABLE_NAME",
"COLUMN_NAME",
"DATA_TYPE"
FROM "INFORMATION_SCHEMA"."COLUMNS"
WHERE "TABLE_SCHEMA" = \'{snowflake_conn.schema}\'
AND {filterred_tables_string}
AND {filterred_exctables_string}
ORDER BY "TABLE_SCHEMA", "TABLE_NAME", "COLUMN_NAME"
''')
df_columns = cur.fetch_pandas_all()

for index, item in df_tables.iterrows():
table_item = {
"name": item['TABLE_NAME'],
Expand Down
14 changes: 10 additions & 4 deletions snowbim/snowbim.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from snowbim.engines import snowengine, bimengine

def get_schema_changes(bim_path=None, profile_dir:str=None, profile:str=None, target:str=None, db:str=None, schema:str=None):
def get_schema_changes(bim_path=None, profile_dir:str=None, profile:str=None, target:str=None, db:str=None, schema:str=None, tables:list=[], exclude_tables:list=[]):
'''
get_schema_changes
'''
Expand All @@ -9,16 +9,22 @@ def get_schema_changes(bim_path=None, profile_dir:str=None, profile:str=None, ta
print(' Connected')

print('Schema comparing...')
changes = snowengine.compare_schema(snowflake_conn=conn[1], bim_path=bim_path)
changes = snowengine.compare_schema(snowflake_conn=conn[1], bim_path=bim_path, tables=tables, exclude_tables=exclude_tables)
print(' Done')

return changes

def upgrade_schema(bim_path=None, out_bim_path=None, profile_dir:str=None, profile:str=None, target:str=None, db:str=None, schema:str=None):
def upgrade_schema(bim_path=None, out_bim_path=None, profile_dir:str=None, profile:str=None, target:str=None, db:str=None, schema:str=None, tables:list=[], exclude_tables:list=[]):
'''
upgrade_schema
'''
changes = get_schema_changes(bim_path=bim_path, profile_dir=profile_dir, profile=profile, target=target, db=db, schema=schema)
print(f"""Getting changes against:
+ Database: {db}
+ Schema: {schema}
+ Tables inclusive: {','.join(tables) or '(all)'}
+ Tables exclusive: {','.join(exclude_tables) or '(none)'}
""")
changes = get_schema_changes(bim_path=bim_path, profile_dir=profile_dir, profile=profile, target=target, db=db, schema=schema, tables=tables, exclude_tables=exclude_tables)
if changes[0] == 0:
changes = changes[1]
else:
Expand Down

0 comments on commit bd980e2

Please sign in to comment.