New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEATURE] Enable Expectations tests for BigQuery #3219
Changes from 77 commits
184c04f
25a008f
1b73b96
7223884
5a21fe6
5798fef
3c86f6c
375462e
1328ed0
d069670
21b3083
10537f5
667294f
cfc9ca7
7b2dd0b
477887d
6cf890a
6c4a2fd
7a6907e
c56964e
555fc5a
4cc425e
afc77c5
1e7a0b3
ce7ff46
d56858e
4976ed6
9b36b6e
a24bc9c
f423665
601f438
88c5208
bd88211
ea3d55a
eb313d2
b34a0e0
649c643
b85e103
f2cba33
de81bbc
5ed0d41
664e059
7f64366
b87ae4d
0d58e65
bb34885
de2dd98
6c5efb1
022e184
1f17e36
7229c3f
a2aafe4
0f703bc
3b4925a
d9431fc
ebd1f06
abcb64a
c89c395
84fb721
5635983
a9c6163
d5e04d5
d9543c7
0f8aa53
069ca11
c55431e
0eff30c
161579a
082eac5
3061d1f
e94188f
c5ebe6a
3f1ba9e
3580cf4
7a06de4
f3cfe32
2a9c6fc
3ae0eb1
85e1f42
af2fde7
84355d6
026cd19
f11293c
f954297
d49d09a
44a2e5a
3c463a5
609c4f7
ced8994
c25846d
78fcca7
f07193c
7b8c7a9
5b90ebf
a28280b
65e7db9
4931f67
1bafb0c
2b9e6b1
1fda0f8
637da06
d0e63a7
13ed1d4
aad54a8
78d89c2
06dfad9
4d251dc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
stages: | ||
- stage: lint | ||
pool: | ||
vmImage: 'ubuntu-latest' | ||
|
||
jobs: | ||
- job: lint | ||
steps: | ||
- task: UsePythonVersion@0 | ||
inputs: | ||
versionSpec: 3.7 | ||
displayName: 'Use Python 3.7' | ||
|
||
- script: | | ||
pip install isort[requirements]==5.4.2 flake8==3.8.3 black==20.8b1 pyupgrade==2.7.2 | ||
EXIT_STATUS=0 | ||
isort . --check-only --skip docs/ || EXIT_STATUS=$? | ||
black --check --exclude docs/ . || EXIT_STATUS=$? | ||
flake8 great_expectations/core || EXIT_STATUS=$? | ||
pyupgrade --py3-plus || EXIT_STATUS=$? | ||
exit $EXIT_STATUS | ||
|
||
- stage: cloud_db_integration_expectations_cfe | ||
pool: | ||
vmImage: 'ubuntu-latest' | ||
|
||
dependsOn: [lint] | ||
|
||
jobs: | ||
- job: bigquery_expectations_cfe | ||
timeoutInMinutes: 0 # Maximize the time that pipelines remain open (6 hours currently) | ||
|
||
variables: | ||
python.version: '3.8' | ||
|
||
steps: | ||
# delay the execution of the second stages so that we do not hit the rate limit for BigQuery | ||
# - bash: sleep 5m | ||
Shinnnyshinshin marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# displayName: Delay for BigQuery rate limit | ||
|
||
- task: UsePythonVersion@0 | ||
inputs: | ||
versionSpec: '$(python.version)' | ||
displayName: 'Use Python $(python.version)' | ||
|
||
- bash: python -m pip install --upgrade pip==20.2.4 | ||
Shinnnyshinshin marked this conversation as resolved.
Show resolved
Hide resolved
|
||
displayName: 'Update pip' | ||
|
||
- script: | | ||
pip install -r requirements-dev.txt | ||
|
||
displayName: 'Install dependencies' | ||
|
||
- task: DownloadSecureFile@1 | ||
name: gcp_authkey | ||
displayName: 'Download Google Service Account' | ||
inputs: | ||
secureFile: 'superconductive-service-acct.json' | ||
retryCount: '2' | ||
|
||
- script: | | ||
pip install pytest pytest-azurepipelines | ||
pytest -v --no-spark --no-postgresql --bigquery --napoleon-docstrings --junitxml=junit/test-results.xml --cov=. --cov-report=xml --cov-report=html --ignore=tests/cli --ignore=tests/integration/usage_statistics tests/test_definitions/test_expectations_cfe.py | ||
|
||
displayName: 'pytest' | ||
env: | ||
GOOGLE_APPLICATION_CREDENTIALS: $(gcp_authkey.secureFilePath) | ||
GCP_PROJECT: $(GCP_PROJECT) | ||
GCP_BIGQUERY_DATASET: $(GCP_BIGQUERY_DATASET) | ||
|
||
- stage: cloud_db_integration_expectations | ||
pool: | ||
vmImage: 'ubuntu-latest' | ||
|
||
dependsOn: [cloud_db_integration_expectations_cfe, lint] | ||
jobs: | ||
- job: bigquery_expectations | ||
timeoutInMinutes: 0 # Maximize the time that pipelines remain open (6 hours currently) | ||
|
||
variables: | ||
python.version: '3.8' | ||
|
||
steps: | ||
- task: UsePythonVersion@0 | ||
inputs: | ||
versionSpec: '$(python.version)' | ||
displayName: 'Use Python $(python.version)' | ||
|
||
- bash: python -m pip install --upgrade pip==20.2.4 | ||
displayName: 'Update pip' | ||
|
||
- script: | | ||
pip install -r requirements-dev.txt | ||
|
||
displayName: 'Install dependencies' | ||
|
||
- task: DownloadSecureFile@1 | ||
name: gcp_authkey | ||
displayName: 'Download Google Service Account' | ||
inputs: | ||
secureFile: 'superconductive-service-acct.json' | ||
retryCount: '2' | ||
|
||
- script: | | ||
pip install pytest pytest-azurepipelines | ||
pytest -v --no-spark --no-postgresql --bigquery --napoleon-docstrings --junitxml=junit/test-results.xml --cov=. --cov-report=xml --cov-report=html --ignore=tests/cli --ignore=tests/integration/usage_statistics tests/test_definitions/test_expectations.py | ||
|
||
displayName: 'pytest' | ||
env: | ||
GOOGLE_APPLICATION_CREDENTIALS: $(gcp_authkey.secureFilePath) | ||
GCP_PROJECT: $(GCP_PROJECT) | ||
GCP_BIGQUERY_DATASET: $(GCP_BIGQUERY_DATASET) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,26 @@ For example, you can run `pytest --no-spark --no-sqlalchemy` to skip all local b | |
|
||
Note: as of early 2020, the tests generate many warnings. Most of these are generated by dependencies (pandas, sqlalchemy, etc.) You can suppress them with pytest’s `--disable-pytest-warnings` flag: `pytest --no-spark --no-sqlalchemy --disable-pytest-warnings`. | ||
|
||
#### BigQuery tests | ||
|
||
In order to run BigQuery tests, you first need to go through the following steps: | ||
|
||
1. Select or create a Cloud Platform project. | ||
2. Setup Authentication. | ||
3. In your project, create a bigquery dataset named `test_ci` and set the dataset default table expiration to `.1` days | ||
|
||
* Select or create a Cloud Platform project.: [https://console.cloud.google.com/project](https://console.cloud.google.com/project) | ||
* Setup Authentication.: [https://googleapis.dev/python/google-api-core/latest/auth.html](https://googleapis.dev/python/google-api-core/latest/auth.html) | ||
* In your project, create a bigquery dataset named `test_ci`: [https://cloud.google.com/bigquery/docs/datasets](https://cloud.google.com/bigquery/docs/datasets) | ||
* Set the dataset default table expiration to `.1` days: [https://cloud.google.com/bigquery/docs/updating-datasets#table-expiration](https://cloud.google.com/bigquery/docs/updating-datasets#table-expiration) | ||
|
||
After setting up authentication, you can run with your project using the environment variable `GE_TEST_BIGQUERY_PROJECT`, e.g. | ||
|
||
```bash | ||
GE_TEST_BIGQUERY_PROJECT=<YOUR_GOOGLE_CLOUD_PROJECT> | ||
pytest tests/test_definitions/test_expectations_cfe.py --bigquery --no-spark --no-postgresql | ||
``` | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Super nitpicky but could we capitalize any usages of BigQuery in this file (and any other There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done! |
||
### Writing unit and integration tests | ||
|
||
Production code in Great Expectations must be thoroughly tested. In general, we insist on unit tests for all branches of every method, including likely error states. Most new feature contributions should include several unit tests. Contributions that modify or extend existing features should include a test of the new behavior. | ||
|
@@ -25,7 +45,7 @@ Experimental code in Great Expectations need only be tested lightly. We are movi | |
|
||
Most of Great Expectations’ integration testing is in the CLI, which naturally exercises most of the core code paths. Because integration tests require a lot of developer time to maintain, most contributions should not include new integration tests, unless they change the CLI itself. | ||
|
||
Note: we do not currently test Great Expectations against all types of SQL database. CI test coverage for SQL is limited to postgresql and sqlite. We have observed some bugs because of unsupported features or differences in SQL dialects, and we are actively working to improve dialect-specific support and testing. | ||
Note: we do not currently test Great Expectations against all types of SQL database. CI test coverage for SQL is limited to postgresql, sqlite, mssql, and bigquery. We have observed some bugs because of unsupported features or differences in SQL dialects, and we are actively working to improve dialect-specific support and testing. | ||
|
||
### Unit tests for Expectations | ||
One of Great Expectations’ important promises is that the same Expectation will produce the same result across all supported execution environments: pandas, sqlalchemy, and Spark. | ||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -548,6 +548,7 @@ def __init__( | |||||||||||||||||||||||
"sqlite", | ||||||||||||||||||||||||
"oracle", | ||||||||||||||||||||||||
"mssql", | ||||||||||||||||||||||||
"bigquery", | ||||||||||||||||||||||||
]: | ||||||||||||||||||||||||
# These are the officially included and supported dialects by sqlalchemy | ||||||||||||||||||||||||
self.dialect = import_library_module( | ||||||||||||||||||||||||
|
@@ -571,6 +572,10 @@ def __init__( | |||||||||||||||||||||||
self.dialect = import_library_module( | ||||||||||||||||||||||||
module_name="pyathena.sqlalchemy_athena" | ||||||||||||||||||||||||
) | ||||||||||||||||||||||||
elif self.engine.dialect.name.lower() == "bigquery": | ||||||||||||||||||||||||
self.dialect = import_library_module( | ||||||||||||||||||||||||
module_name="pybigquery.sqlalchemy_bigquery" | ||||||||||||||||||||||||
) | ||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. More of a style comment but would it be possible to clean up this
Suggested change
Very minor but a nice boost to readability (plus we don't need to recreate a |
||||||||||||||||||||||||
else: | ||||||||||||||||||||||||
self.dialect = None | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
|
@@ -2093,9 +2098,7 @@ def _get_dialect_like_pattern_expression(self, column, like_pattern, positive=Tr | |||||||||||||||||||||||
|
||||||||||||||||||||||||
try: | ||||||||||||||||||||||||
# Bigquery | ||||||||||||||||||||||||
if isinstance( | ||||||||||||||||||||||||
self.sql_engine_dialect, pybigquery.sqlalchemy_bigquery.BigQueryDialect | ||||||||||||||||||||||||
): | ||||||||||||||||||||||||
if hasattr(self.sql_engine_dialect, "BigQueryDialect"): | ||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we perhaps log something if we hit an exception? I know logging was causing some issues earlier so avoid if it blocks you. I was just thinking that it might be useful to get some more information than a standard There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added |
||||||||||||||||||||||||
dialect_supported = True | ||||||||||||||||||||||||
except ( | ||||||||||||||||||||||||
AttributeError, | ||||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -62,20 +62,21 @@ def _sqlalchemy_window(cls, column, _table, **kwargs): | |
# the column we will be performing the expectation on, and the query is performed against it. | ||
dialect = kwargs.get("_dialect", None) | ||
sql_engine = kwargs.get("_sqlalchemy_engine", None) | ||
if sql_engine and dialect and dialect.dialect.name == "mysql": | ||
temp_table_name = f"ge_tmp_{str(uuid.uuid4())[:8]}" | ||
temp_table_stmt = "CREATE TEMPORARY TABLE {new_temp_table} AS SELECT tmp.{column_name} FROM {source_table} tmp".format( | ||
new_temp_table=temp_table_name, | ||
source_table=_table, | ||
column_name=column.name, | ||
) | ||
sql_engine.execute(temp_table_stmt) | ||
dup_query = ( | ||
sa.select([column]) | ||
.select_from(sa.text(temp_table_name)) | ||
.group_by(column) | ||
.having(sa.func.count(column) > 1) | ||
) | ||
if sql_engine and dialect: | ||
if hasattr(dialect, "dialect") and dialect.dialect.name == "mysql": | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it would be cleaner to force the sqlalchemy dialect to follow the convention of including a It appears there are many usages of dialect, e.g. Please consider merging into this branch #3259 which includes forcing BigQuery to have this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you very (very) much for this 🙇🏼 |
||
temp_table_name = f"ge_tmp_{str(uuid.uuid4())[:8]}" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice work! |
||
temp_table_stmt = "CREATE TEMPORARY TABLE {new_temp_table} AS SELECT tmp.{column_name} FROM {source_table} tmp".format( | ||
new_temp_table=temp_table_name, | ||
source_table=_table, | ||
column_name=column.name, | ||
) | ||
sql_engine.execute(temp_table_stmt) | ||
dup_query = ( | ||
sa.select([column]) | ||
.select_from(sa.text(temp_table_name)) | ||
.group_by(column) | ||
.having(sa.func.count(column) > 1) | ||
) | ||
return column.notin_(dup_query) | ||
|
||
@column_condition_partial( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -123,7 +123,7 @@ def get_dialect_regex_expression(column, regex, dialect, positive=True): | |
|
||
try: | ||
# Bigquery | ||
if issubclass(dialect.dialect, pybigquery.sqlalchemy_bigquery.BigQueryDialect): | ||
if hasattr(dialect, "BigQueryDialect"): | ||
if positive: | ||
return sa.func.REGEXP_CONTAINS(column, literal(regex)) | ||
else: | ||
|
@@ -251,10 +251,10 @@ def column_reflection_fallback( | |
columns_query: str = f""" | ||
SELECT | ||
SCHEMA_NAME(tab.schema_id) AS schema_name, | ||
tab.name AS table_name, | ||
tab.name AS table_name, | ||
col.column_id AS column_id, | ||
col.name AS column_name, | ||
t.name AS column_data_type, | ||
col.name AS column_name, | ||
t.name AS column_data_type, | ||
col.max_length AS column_max_length, | ||
col.precision AS column_precision | ||
FROM sys.tables AS tab | ||
|
@@ -264,7 +264,7 @@ def column_reflection_fallback( | |
ON col.user_type_id = t.user_type_id | ||
WHERE tab.name = '{selectable}' | ||
ORDER BY schema_name, | ||
table_name, | ||
table_name, | ||
column_id | ||
""" | ||
col_info_query: TextClause = sa.text(columns_query) | ||
|
@@ -319,21 +319,23 @@ def get_dialect_like_pattern_expression(column, dialect, like_pattern, positive= | |
|
||
try: | ||
# Bigquery | ||
if isinstance(dialect, pybigquery.sqlalchemy_bigquery.BigQueryDialect): | ||
if hasattr(dialect, "BigQueryDialect"): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same comment about |
||
dialect_supported = True | ||
except ( | ||
AttributeError, | ||
TypeError, | ||
): # TypeError can occur if the driver was not installed and so is None | ||
pass | ||
|
||
if issubclass( | ||
dialect.dialect, | ||
( | ||
sa.dialects.sqlite.dialect, | ||
sa.dialects.postgresql.dialect, | ||
sa.dialects.mysql.dialect, | ||
sa.dialects.mssql.dialect, | ||
if hasattr(dialect, "dialect") and ( | ||
issubclass( | ||
dialect.dialect, | ||
( | ||
sa.dialects.sqlite.dialect, | ||
sa.dialects.postgresql.dialect, | ||
sa.dialects.mysql.dialect, | ||
sa.dialects.mssql.dialect, | ||
), | ||
), | ||
): | ||
dialect_supported = True | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On second thought, do we actually require
lint
? We are doing this check ondevelop
, a branch that strictly disallows code to be merged iflint
fails on its own pipeline. As such, aren't we guaranteed to have properly linted code by the time this check rolls around?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great point. Linting is definitely handled by this point