From 2c527a37ba0a797b913719d1125dee63c0b5ad88 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Wed, 7 Jun 2023 11:59:39 -0700 Subject: [PATCH] Replace unicodecsv with standard csv library (#31693) unicodecsv appears to be missing a license which can cause trouble (see https://github.com/jdunck/python-unicodecsv/issues/80) And it appears that this library may no longer be required. GitOrigin-RevId: fbeb01cb17b7cb9c2e27ac7010f423a2bced78b4 --- airflow/providers/apache/hive/hooks/hive.py | 6 +- .../apache/hive/transfers/mssql_to_hive.py | 6 +- .../apache/hive/transfers/mysql_to_hive.py | 7 +- .../apache/hive/transfers/vertica_to_hive.py | 7 +- .../google/cloud/transfers/sql_to_gcs.py | 16 +-- .../transfers/oracle_to_azure_data_lake.py | 10 +- .../mysql/transfers/vertica_to_mysql.py | 6 +- docs/spelling_wordlist.txt | 1 + setup.cfg | 1 - .../hive/transfers/test_mssql_to_hive.py | 5 +- .../hive/transfers/test_mysql_to_hive.py | 5 +- .../google/cloud/transfers/test_sql_to_gcs.py | 124 +++++++----------- .../test_oracle_to_azure_data_lake.py | 7 +- 13 files changed, 84 insertions(+), 117 deletions(-) diff --git a/airflow/providers/apache/hive/hooks/hive.py b/airflow/providers/apache/hive/hooks/hive.py index 89548fea8d6..fea826a8c39 100644 --- a/airflow/providers/apache/hive/hooks/hive.py +++ b/airflow/providers/apache/hive/hooks/hive.py @@ -37,7 +37,7 @@ raise AirflowOptionalProviderFeatureException(e) -import unicodecsv as csv +import csv from airflow.configuration import conf from airflow.exceptions import AirflowException @@ -989,8 +989,8 @@ def to_csv( message = None i = 0 - with open(csv_filepath, "wb") as file: - writer = csv.writer(file, delimiter=delimiter, lineterminator=lineterminator, encoding="utf-8") + with open(csv_filepath, "w", encoding="utf-8") as file: + writer = csv.writer(file, delimiter=delimiter, lineterminator=lineterminator) try: if output_header: self.log.debug("Cursor description is %s", header) diff --git a/airflow/providers/apache/hive/transfers/mssql_to_hive.py b/airflow/providers/apache/hive/transfers/mssql_to_hive.py index 4e9176bb6ce..ccfef831e5d 100644 --- a/airflow/providers/apache/hive/transfers/mssql_to_hive.py +++ b/airflow/providers/apache/hive/transfers/mssql_to_hive.py @@ -18,12 +18,12 @@ """This module contains an operator to move data from MSSQL to Hive.""" from __future__ import annotations +import csv from collections import OrderedDict from tempfile import NamedTemporaryFile from typing import TYPE_CHECKING, Sequence import pymssql -import unicodecsv as csv from airflow.models import BaseOperator from airflow.providers.apache.hive.hooks.hive import HiveCliHook @@ -113,8 +113,8 @@ def execute(self, context: Context): with mssql.get_conn() as conn: with conn.cursor() as cursor: cursor.execute(self.sql) - with NamedTemporaryFile("w") as tmp_file: - csv_writer = csv.writer(tmp_file, delimiter=self.delimiter, encoding="utf-8") + with NamedTemporaryFile(mode="w", encoding="utf-8") as tmp_file: + csv_writer = csv.writer(tmp_file, delimiter=self.delimiter) field_dict = OrderedDict() for col_count, field in enumerate(cursor.description, start=1): col_position = f"Column{col_count}" diff --git a/airflow/providers/apache/hive/transfers/mysql_to_hive.py b/airflow/providers/apache/hive/transfers/mysql_to_hive.py index 639723611ed..003b6e3f838 100644 --- a/airflow/providers/apache/hive/transfers/mysql_to_hive.py +++ b/airflow/providers/apache/hive/transfers/mysql_to_hive.py @@ -18,13 +18,13 @@ """This module contains an operator to move data from MySQL to Hive.""" from __future__ import annotations +import csv from collections import OrderedDict from contextlib import closing from tempfile import NamedTemporaryFile from typing import TYPE_CHECKING, Sequence import MySQLdb -import unicodecsv as csv from airflow.models import BaseOperator from airflow.providers.apache.hive.hooks.hive import HiveCliHook @@ -84,7 +84,7 @@ def __init__( recreate: bool = False, partition: dict | None = None, delimiter: str = chr(1), - quoting: str | None = None, + quoting: int | None = None, quotechar: str = '"', escapechar: str | None = None, mysql_conn_id: str = "mysql_default", @@ -133,7 +133,7 @@ def execute(self, context: Context): hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id, auth=self.hive_auth) mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id) self.log.info("Dumping MySQL query results to local file") - with NamedTemporaryFile("wb") as f: + with NamedTemporaryFile(mode="w", encoding="utf-8") as f: with closing(mysql.get_conn()) as conn: with closing(conn.cursor()) as cursor: cursor.execute(self.sql) @@ -143,7 +143,6 @@ def execute(self, context: Context): quoting=self.quoting, quotechar=self.quotechar if self.quoting != csv.QUOTE_NONE else None, escapechar=self.escapechar, - encoding="utf-8", ) field_dict = OrderedDict() if cursor.description is not None: diff --git a/airflow/providers/apache/hive/transfers/vertica_to_hive.py b/airflow/providers/apache/hive/transfers/vertica_to_hive.py index ce6233052d0..78c14e0e934 100644 --- a/airflow/providers/apache/hive/transfers/vertica_to_hive.py +++ b/airflow/providers/apache/hive/transfers/vertica_to_hive.py @@ -18,12 +18,11 @@ """This module contains an operator to move data from Vertica to Hive.""" from __future__ import annotations +import csv from collections import OrderedDict from tempfile import NamedTemporaryFile from typing import TYPE_CHECKING, Any, Sequence -import unicodecsv as csv - from airflow.models import BaseOperator from airflow.providers.apache.hive.hooks.hive import HiveCliHook from airflow.providers.vertica.hooks.vertica import VerticaHook @@ -118,8 +117,8 @@ def execute(self, context: Context): conn = vertica.get_conn() cursor = conn.cursor() cursor.execute(self.sql) - with NamedTemporaryFile("w") as f: - csv_writer = csv.writer(f, delimiter=self.delimiter, encoding="utf-8") + with NamedTemporaryFile(mode="w", encoding="utf-8") as f: + csv_writer = csv.writer(f, delimiter=self.delimiter) field_dict = OrderedDict() for col_count, field in enumerate(cursor.description, start=1): col_position = f"Column{col_count}" diff --git a/airflow/providers/google/cloud/transfers/sql_to_gcs.py b/airflow/providers/google/cloud/transfers/sql_to_gcs.py index a60f5cb2798..3bd8af683f7 100644 --- a/airflow/providers/google/cloud/transfers/sql_to_gcs.py +++ b/airflow/providers/google/cloud/transfers/sql_to_gcs.py @@ -19,6 +19,7 @@ from __future__ import annotations import abc +import csv import json import os from tempfile import NamedTemporaryFile @@ -26,7 +27,6 @@ import pyarrow as pa import pyarrow.parquet as pq -import unicodecsv as csv from airflow.models import BaseOperator from airflow.providers.google.cloud.hooks.gcs import GCSHook @@ -286,12 +286,10 @@ def _write_local_data_files(self, cursor): row = self.convert_types(schema, col_type_dict, row) row_dict = dict(zip(schema, row)) - tmp_file_handle.write( - json.dumps(row_dict, sort_keys=True, ensure_ascii=False).encode("utf-8") - ) + json.dump(row_dict, tmp_file_handle, sort_keys=True, ensure_ascii=False) # Append newline to make dumps BigQuery compatible. - tmp_file_handle.write(b"\n") + tmp_file_handle.write("\n") # Stop if the file exceeds the file size limit. fppos = tmp_file_handle.tell() @@ -323,7 +321,7 @@ def _write_local_data_files(self, cursor): def _get_file_to_upload(self, file_mime_type, file_no): """Returns a dictionary that represents the file to upload.""" - tmp_file_handle = NamedTemporaryFile(delete=True) + tmp_file_handle = NamedTemporaryFile(mode="w", encoding="utf-8", delete=True) return ( { "file_name": self.filename.format(file_no), @@ -347,7 +345,7 @@ def _configure_csv_file(self, file_handle, schema): """Configure a csv writer with the file_handle and write schema as headers for the new file. """ - csv_writer = csv.writer(file_handle, encoding="utf-8", delimiter=self.field_delimiter) + csv_writer = csv.writer(file_handle, delimiter=self.field_delimiter) csv_writer.writerow(schema) return csv_writer @@ -436,8 +434,8 @@ def _write_local_schema_file(self, cursor): self.log.info("Using schema for %s", self.schema_filename) self.log.debug("Current schema: %s", schema) - tmp_schema_file_handle = NamedTemporaryFile(delete=True) - tmp_schema_file_handle.write(schema.encode("utf-8")) + tmp_schema_file_handle = NamedTemporaryFile(mode="w", encoding="utf-8", delete=True) + tmp_schema_file_handle.write(schema) schema_file_to_upload = { "file_name": self.schema_filename, "file_handle": tmp_schema_file_handle, diff --git a/airflow/providers/microsoft/azure/transfers/oracle_to_azure_data_lake.py b/airflow/providers/microsoft/azure/transfers/oracle_to_azure_data_lake.py index 3e632e512f6..93e8a7b14d3 100644 --- a/airflow/providers/microsoft/azure/transfers/oracle_to_azure_data_lake.py +++ b/airflow/providers/microsoft/azure/transfers/oracle_to_azure_data_lake.py @@ -17,12 +17,11 @@ # under the License. from __future__ import annotations +import csv import os from tempfile import TemporaryDirectory from typing import TYPE_CHECKING, Any, Sequence -import unicodecsv as csv - from airflow.models import BaseOperator from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeHook from airflow.providers.oracle.hooks.oracle import OracleHook @@ -46,7 +45,7 @@ class OracleToAzureDataLakeOperator(BaseOperator): :param delimiter: field delimiter in the file. :param encoding: encoding type for the file. :param quotechar: Character to use in quoting. - :param quoting: Quoting strategy. See unicodecsv quoting for more information. + :param quoting: Quoting strategy. See csv library for more information. """ template_fields: Sequence[str] = ("filename", "sql", "sql_params") @@ -65,7 +64,7 @@ def __init__( delimiter: str = ",", encoding: str = "utf-8", quotechar: str = '"', - quoting: str = csv.QUOTE_MINIMAL, + quoting: int = csv.QUOTE_MINIMAL, **kwargs, ) -> None: super().__init__(**kwargs) @@ -83,11 +82,10 @@ def __init__( self.quoting = quoting def _write_temp_file(self, cursor: Any, path_to_save: str | bytes | int) -> None: - with open(path_to_save, "wb") as csvfile: + with open(path_to_save, "w", encoding=self.encoding) as csvfile: csv_writer = csv.writer( csvfile, delimiter=self.delimiter, - encoding=self.encoding, quotechar=self.quotechar, quoting=self.quoting, ) diff --git a/airflow/providers/mysql/transfers/vertica_to_mysql.py b/airflow/providers/mysql/transfers/vertica_to_mysql.py index 5f2274c6b4d..16be186fc70 100644 --- a/airflow/providers/mysql/transfers/vertica_to_mysql.py +++ b/airflow/providers/mysql/transfers/vertica_to_mysql.py @@ -17,12 +17,12 @@ # under the License. from __future__ import annotations +import csv from contextlib import closing from tempfile import NamedTemporaryFile from typing import TYPE_CHECKING, Sequence import MySQLdb -import unicodecsv as csv from airflow.models import BaseOperator from airflow.providers.mysql.hooks.mysql import MySqlHook @@ -125,11 +125,11 @@ def _bulk_load_transfer(self, mysql, vertica): with closing(conn.cursor()) as cursor: cursor.execute(self.sql) selected_columns = [d.name for d in cursor.description] - with NamedTemporaryFile("w") as tmpfile: + with NamedTemporaryFile("w", encoding="utf-8") as tmpfile: self.log.info("Selecting rows from Vertica to local file %s...", tmpfile.name) self.log.info(self.sql) - csv_writer = csv.writer(tmpfile, delimiter="\t", encoding="utf-8") + csv_writer = csv.writer(tmpfile, delimiter="\t") for row in cursor.iterate(): csv_writer.writerow(row) count += 1 diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index dc4da70a265..3d882285621 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -1554,6 +1554,7 @@ undead Undeads ungenerated unicode +unicodecsv unindent unittest unittests diff --git a/setup.cfg b/setup.cfg index a9ae40dff5d..35328a5aa86 100644 --- a/setup.cfg +++ b/setup.cfg @@ -146,7 +146,6 @@ install_requires = tenacity>=6.2.0,!=8.2.0 termcolor>=1.1.0 typing-extensions>=4.0.0 - unicodecsv>=0.14.1 werkzeug>=2.0 [options.packages.find] diff --git a/tests/providers/apache/hive/transfers/test_mssql_to_hive.py b/tests/providers/apache/hive/transfers/test_mssql_to_hive.py index 5ad8b329b61..693d594348b 100644 --- a/tests/providers/apache/hive/transfers/test_mssql_to_hive.py +++ b/tests/providers/apache/hive/transfers/test_mssql_to_hive.py @@ -71,9 +71,8 @@ def test_execute(self, mock_hive_hook, mock_mssql_hook, mock_tmp_file, mock_csv) mssql_to_hive_transfer.execute(context={}) mock_mssql_hook_cursor.return_value.execute.assert_called_once_with(mssql_to_hive_transfer.sql) - mock_csv.writer.assert_called_once_with( - mock_tmp_file, delimiter=mssql_to_hive_transfer.delimiter, encoding="utf-8" - ) + mock_tmp_file.assert_called_with(mode="w", encoding="utf-8") + mock_csv.writer.assert_called_once_with(mock_tmp_file, delimiter=mssql_to_hive_transfer.delimiter) field_dict = OrderedDict() for field in mock_mssql_hook_cursor.return_value.description: field_dict[field[0]] = mssql_to_hive_transfer.type_map(field[1]) diff --git a/tests/providers/apache/hive/transfers/test_mysql_to_hive.py b/tests/providers/apache/hive/transfers/test_mysql_to_hive.py index 545ab610b83..a7cf8f025b5 100644 --- a/tests/providers/apache/hive/transfers/test_mysql_to_hive.py +++ b/tests/providers/apache/hive/transfers/test_mysql_to_hive.py @@ -17,6 +17,7 @@ # under the License. from __future__ import annotations +import csv import textwrap from collections import OrderedDict from contextlib import closing @@ -187,7 +188,6 @@ def baby_names_table(self): ) @pytest.mark.usefixtures("baby_names_table") def test_mysql_to_hive(self, spy_on_hive, params, expected, csv): - sql = "SELECT * FROM baby_names LIMIT 1000;" op = MySqlToHiveOperator( task_id="test_m2h", @@ -247,7 +247,6 @@ def test_mysql_to_hive_type_conversion(self, spy_on_hive): cursor.execute(f"DROP TABLE IF EXISTS {mysql_table}") def test_mysql_to_hive_verify_csv_special_char(self, spy_on_hive): - mysql_table = "test_mysql_to_hive" hive_table = "test_mysql_to_hive" @@ -277,8 +276,6 @@ def test_mysql_to_hive_verify_csv_special_char(self, spy_on_hive): ) conn.commit() - import unicodecsv as csv - op = MySqlToHiveOperator( task_id="test_m2h", hive_cli_conn_id="hive_cli_default", diff --git a/tests/providers/google/cloud/transfers/test_sql_to_gcs.py b/tests/providers/google/cloud/transfers/test_sql_to_gcs.py index c82e9d118e7..28243633053 100644 --- a/tests/providers/google/cloud/transfers/test_sql_to_gcs.py +++ b/tests/providers/google/cloud/transfers/test_sql_to_gcs.py @@ -22,7 +22,6 @@ import pandas as pd import pytest -import unicodecsv as csv from airflow.providers.google.cloud.hooks.gcs import GCSHook from airflow.providers.google.cloud.transfers.sql_to_gcs import BaseSQLToGCSOperator @@ -87,36 +86,21 @@ def query(self): class TestBaseSQLToGCSOperator: @mock.patch("airflow.providers.google.cloud.transfers.sql_to_gcs.NamedTemporaryFile") - @mock.patch.object(csv.writer, "writerow") + @mock.patch("csv.writer") @mock.patch.object(GCSHook, "upload") @mock.patch.object(DummySQLToGCSOperator, "query") @mock.patch.object(DummySQLToGCSOperator, "convert_type") - def test_exec(self, mock_convert_type, mock_query, mock_upload, mock_writerow, mock_tempfile): + def test_exec(self, mock_convert_type, mock_query, mock_upload, mock_writer, mock_tempfile): cursor_mock = Mock() cursor_mock.description = CURSOR_DESCRIPTION cursor_mock.__iter__ = Mock(return_value=iter(INPUT_DATA)) mock_query.return_value = cursor_mock mock_convert_type.return_value = "convert_type_return_value" - mock_file = Mock() - - mock_tell = Mock() - mock_tell.return_value = 3 - mock_file.tell = mock_tell - - mock_flush = Mock() - mock_file.flush = mock_flush - - mock_close = Mock() - mock_file.close = mock_close - + mock_file = mock_tempfile.return_value + mock_file.tell.return_value = 3 mock_file.name = TMP_FILE_NAME - mock_write = Mock() - mock_file.write = mock_write - - mock_tempfile.return_value = mock_file - # Test CSV operator = DummySQLToGCSOperator( sql=SQL, @@ -145,18 +129,16 @@ def test_exec(self, mock_convert_type, mock_query, mock_upload, mock_writerow, m } mock_query.assert_called_once() - mock_writerow.assert_has_calls( - [ - mock.call(COLUMNS), - mock.call(ROW), - mock.call(COLUMNS), - mock.call(ROW), - mock.call(COLUMNS), - mock.call(ROW), - mock.call(COLUMNS), - ] - ) - mock_flush.assert_has_calls([mock.call(), mock.call(), mock.call(), mock.call()]) + assert mock_writer.return_value.writerow.call_args_list == [ + mock.call(COLUMNS), + mock.call(ROW), + mock.call(COLUMNS), + mock.call(ROW), + mock.call(COLUMNS), + mock.call(ROW), + mock.call(COLUMNS), + ] + mock_file.flush.assert_has_calls([mock.call(), mock.call(), mock.call(), mock.call()]) csv_calls = [] for i in range(0, 3): csv_calls.append( @@ -174,12 +156,12 @@ def test_exec(self, mock_convert_type, mock_query, mock_upload, mock_writerow, m ) upload_calls = [json_call, csv_calls[0], csv_calls[1], csv_calls[2]] mock_upload.assert_has_calls(upload_calls) - mock_close.assert_has_calls([mock.call(), mock.call(), mock.call(), mock.call()]) + mock_file.close.assert_has_calls([mock.call(), mock.call(), mock.call(), mock.call()]) mock_query.reset_mock() - mock_flush.reset_mock() + mock_file.flush.reset_mock() mock_upload.reset_mock() - mock_close.reset_mock() + mock_file.close.reset_mock() cursor_mock.reset_mock() cursor_mock.__iter__ = Mock(return_value=iter(INPUT_DATA)) @@ -200,26 +182,24 @@ def test_exec(self, mock_convert_type, mock_query, mock_upload, mock_writerow, m } mock_query.assert_called_once() - mock_write.assert_has_calls( - [ - mock.call(OUTPUT_DATA), - mock.call(b"\n"), - mock.call(OUTPUT_DATA), - mock.call(b"\n"), - mock.call(OUTPUT_DATA), - mock.call(b"\n"), - ] - ) - mock_flush.assert_called_once() + mock_file.write.call_args_list == [ + mock.call(OUTPUT_DATA), + mock.call(b"\n"), + mock.call(OUTPUT_DATA), + mock.call(b"\n"), + mock.call(OUTPUT_DATA), + mock.call(b"\n"), + ] mock_upload.assert_called_once_with( BUCKET, FILENAME.format(0), TMP_FILE_NAME, mime_type=APP_JSON, gzip=False, metadata=None ) - mock_close.assert_called_once() + mock_file.close.assert_called_once() mock_query.reset_mock() - mock_flush.reset_mock() + mock_file.flush.reset_mock() mock_upload.reset_mock() - mock_close.reset_mock() + mock_file.close.reset_mock() + mock_file.write.reset_mock() cursor_mock.reset_mock() cursor_mock.__iter__ = Mock(return_value=iter(INPUT_DATA)) @@ -246,18 +226,16 @@ def test_exec(self, mock_convert_type, mock_query, mock_upload, mock_writerow, m } mock_query.assert_called_once() - mock_write.assert_has_calls( - [ - mock.call(OUTPUT_DATA), - mock.call(b"\n"), - mock.call(OUTPUT_DATA), - mock.call(b"\n"), - mock.call(OUTPUT_DATA), - mock.call(b"\n"), - ] - ) - - mock_flush.assert_called_once() + mock_file.write.call_args_list == [ + mock.call(OUTPUT_DATA), + mock.call(b"\n"), + mock.call(OUTPUT_DATA), + mock.call(b"\n"), + mock.call(OUTPUT_DATA), + mock.call(b"\n"), + ] + + mock_file.flush.assert_called_once() mock_upload.assert_called_once_with( BUCKET, FILENAME.format(0), @@ -266,12 +244,12 @@ def test_exec(self, mock_convert_type, mock_query, mock_upload, mock_writerow, m gzip=False, metadata={"row_count": 3}, ) - mock_close.assert_called_once() + mock_file.close.assert_called_once() mock_query.reset_mock() - mock_flush.reset_mock() + mock_file.flush.reset_mock() mock_upload.reset_mock() - mock_close.reset_mock() + mock_file.close.reset_mock() cursor_mock.reset_mock() cursor_mock.__iter__ = Mock(return_value=iter(INPUT_DATA)) @@ -296,7 +274,7 @@ def test_exec(self, mock_convert_type, mock_query, mock_upload, mock_writerow, m } mock_query.assert_called_once() - mock_flush.assert_called_once() + mock_file.flush.assert_called_once() mock_upload.assert_called_once_with( BUCKET, FILENAME.format(0), @@ -305,12 +283,12 @@ def test_exec(self, mock_convert_type, mock_query, mock_upload, mock_writerow, m gzip=False, metadata=None, ) - mock_close.assert_called_once() + mock_file.close.assert_called_once() mock_query.reset_mock() - mock_flush.reset_mock() + mock_file.flush.reset_mock() mock_upload.reset_mock() - mock_close.reset_mock() + mock_file.close.reset_mock() cursor_mock.reset_mock() cursor_mock.__iter__ = Mock(return_value=iter(INPUT_DATA)) @@ -351,8 +329,8 @@ def test_exec(self, mock_convert_type, mock_query, mock_upload, mock_writerow, m } mock_query.assert_called_once() - assert mock_flush.call_count == 3 - assert mock_close.call_count == 3 + assert mock_file.flush.call_count == 3 + assert mock_file.close.call_count == 3 mock_upload.assert_has_calls( [ mock.call( @@ -368,9 +346,9 @@ def test_exec(self, mock_convert_type, mock_query, mock_upload, mock_writerow, m ) mock_query.reset_mock() - mock_flush.reset_mock() + mock_file.flush.reset_mock() mock_upload.reset_mock() - mock_close.reset_mock() + mock_file.close.reset_mock() cursor_mock.reset_mock() cursor_mock.__iter__ = Mock(return_value=iter(INPUT_DATA)) @@ -396,7 +374,7 @@ def test_exec(self, mock_convert_type, mock_query, mock_upload, mock_writerow, m "files": [{"file_name": "test_results_0.csv", "file_mime_type": "text/csv", "file_row_count": 3}], } - mock_writerow.assert_has_calls( + mock_writer.return_value.writerow.assert_has_calls( [ mock.call(COLUMNS), mock.call(["NULL", "NULL", "NULL"]), diff --git a/tests/providers/microsoft/azure/transfers/test_oracle_to_azure_data_lake.py b/tests/providers/microsoft/azure/transfers/test_oracle_to_azure_data_lake.py index 52feffeedfe..fa5f7283d23 100644 --- a/tests/providers/microsoft/azure/transfers/test_oracle_to_azure_data_lake.py +++ b/tests/providers/microsoft/azure/transfers/test_oracle_to_azure_data_lake.py @@ -17,13 +17,12 @@ # under the License. from __future__ import annotations +import csv import os from tempfile import TemporaryDirectory from unittest import mock from unittest.mock import MagicMock -import unicodecsv as csv - from airflow.providers.microsoft.azure.transfers.oracle_to_azure_data_lake import ( OracleToAzureDataLakeOperator, ) @@ -69,8 +68,8 @@ def test_write_temp_file(self): assert os.path.exists(os.path.join(temp, filename)) == 1 - with open(os.path.join(temp, filename), "rb") as csvfile: - temp_file = csv.reader(csvfile, delimiter=delimiter, encoding=encoding) + with open(os.path.join(temp, filename), encoding=encoding) as csvfile: + temp_file = csv.reader(csvfile, delimiter=delimiter) rownum = 0 for row in temp_file: