Skip to content

Commit

Permalink
Merge branch 'MAINTENANCE/GDOC-219' of https://github.com/great-expec…
Browse files Browse the repository at this point in the history
…tations/great_expectations into MAINTENANCE/GDOC-219/tests_datasource_sparkdf_pandas

* 'MAINTENANCE/GDOC-219' of https://github.com/great-expectations/great_expectations:
  [MAINTENANCE] Tests for RuntimeDataConnector at DataContext-level (#3304)
  [BUGFIX] SQL dialect doesn't register for BigQuery for V2 (#3324)
  [WIP] [FEATURE] add backend args to run_diagnostics (#3257)
  • Loading branch information
Shinnnyshinshin committed Aug 30, 2021
2 parents 51b9a4a + 572796c commit c1d6f60
Show file tree
Hide file tree
Showing 6 changed files with 222 additions and 65 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import json
import logging
import traceback
from typing import Any, Dict, Optional, Tuple

import numpy as np
Expand Down Expand Up @@ -26,11 +28,13 @@
ColumnMetricProvider,
column_aggregate_value,
)
from great_expectations.expectations.metrics.import_manager import F, sa
from great_expectations.expectations.metrics.metric_provider import (
MetricProvider,
metric_value,
from great_expectations.expectations.metrics.column_aggregate_metric_provider import (
ColumnAggregateMetricProvider,
column_aggregate_partial,
column_aggregate_value,
)
from great_expectations.expectations.metrics.import_manager import F, sa
from great_expectations.expectations.metrics.metric_provider import metric_value
from great_expectations.expectations.util import render_evaluation_parameter_string
from great_expectations.render.renderer.renderer import renderer
from great_expectations.render.types import RenderedStringTemplateContent
Expand All @@ -42,6 +46,32 @@
)
from great_expectations.validator.validation_graph import MetricConfiguration

logger = logging.getLogger(__name__)

try:
from sqlalchemy.exc import ProgrammingError
from sqlalchemy.sql import Select
except ImportError:
logger.debug(
"Unable to load SqlAlchemy context; install optional sqlalchemy dependency for support"
)
ProgrammingError = None
Select = None

try:
from sqlalchemy.engine.row import Row
except ImportError:
try:
from sqlalchemy.engine.row import RowProxy

Row = RowProxy
except ImportError:
logger.debug(
"Unable to load SqlAlchemy Row class; please upgrade you sqlalchemy installation to the latest version."
)
RowProxy = None
Row = None


class ColumnSkew(ColumnMetricProvider):
"""MetricProvider Class for Aggregate Mean MetricProvider"""
Expand All @@ -55,34 +85,80 @@ def _pandas(cls, column, abs=False, **kwargs):
return np.abs(stats.skew(column))
return stats.skew(column)

#
# @metric_value(engine=SqlAlchemyExecutionEngine, metric_fn_type="value")
# def _sqlalchemy(
# cls,
# execution_engine: "SqlAlchemyExecutionEngine",
# metric_domain_kwargs: Dict,
# metric_value_kwargs: Dict,
# metrics: Dict[Tuple, Any],
# runtime_configuration: Dict,
# ):
# (
# selectable,
# compute_domain_kwargs,
# accessor_domain_kwargs,
# ) = execution_engine.get_compute_domain(
# metric_domain_kwargs, MetricDomainTypes.COLUMN
# )
# column_name = accessor_domain_kwargs["column"]
# column = sa.column(column_name)
# sqlalchemy_engine = execution_engine.engine
# dialect = sqlalchemy_engine.dialect
#
# column_median = None
#
# # TODO: compute the value and return it
#
# return column_median
#
@metric_value(engine=SqlAlchemyExecutionEngine)
def _sqlalchemy(
cls,
execution_engine: "SqlAlchemyExecutionEngine",
metric_domain_kwargs: Dict,
metric_value_kwargs: Dict,
metrics: Dict[Tuple, Any],
runtime_configuration: Dict,
):
(
selectable,
compute_domain_kwargs,
accessor_domain_kwargs,
) = execution_engine.get_compute_domain(
metric_domain_kwargs, MetricDomainTypes.COLUMN
)

column_name = accessor_domain_kwargs["column"]
column = sa.column(column_name)
sqlalchemy_engine = execution_engine.engine
dialect = sqlalchemy_engine.dialect

column_mean = _get_query_result(
func=sa.func.avg(column * 1.0),
selectable=selectable,
sqlalchemy_engine=sqlalchemy_engine,
)

column_count = _get_query_result(
func=sa.func.count(column),
selectable=selectable,
sqlalchemy_engine=sqlalchemy_engine,
)

if dialect.name.lower() == "mssql":
standard_deviation = sa.func.stdev(column)
else:
standard_deviation = sa.func.stddev_samp(column)

column_std = _get_query_result(
func=standard_deviation,
selectable=selectable,
sqlalchemy_engine=sqlalchemy_engine,
)

column_third_moment = _get_query_result(
func=sa.func.sum(sa.func.pow(column - column_mean, 3)),
selectable=selectable,
sqlalchemy_engine=sqlalchemy_engine,
)

column_skew = column_third_moment / (column_std ** 3) / (column_count - 1)
if metric_value_kwargs["abs"]:
return np.abs(column_skew)
else:
return column_skew


def _get_query_result(func, selectable, sqlalchemy_engine):
simple_query: Select = sa.select(func).select_from(selectable)

try:
result: Row = sqlalchemy_engine.execute(simple_query).fetchone()[0]
return result
except ProgrammingError as pe:
exception_message: str = "An SQL syntax Exception occurred."
exception_traceback: str = traceback.format_exc()
exception_message += (
f'{type(pe).__name__}: "{str(pe)}". Traceback: "{exception_traceback}".'
)
logger.error(exception_message)
raise pe()

#
# @metric_value(engine=SparkDFExecutionEngine, metric_fn_type="value")
# def _spark(
# cls,
Expand Down Expand Up @@ -229,27 +305,31 @@ class ExpectColumnSkewToBeBetween(ColumnExpectation):
"title": "positive_test_positive_skew",
"exact_match_out": False,
"include_in_gallery": True,
"tolerance": 0.1,
"in": {"column": "a", "min_value": 0.25, "max_value": 10},
"out": {"success": True, "observed_value": 1.6974323016687487},
},
{
"title": "negative_test_no_skew",
"exact_match_out": False,
"include_in_gallery": True,
"tolerance": 0.1,
"in": {"column": "b", "min_value": 0.25, "max_value": 10},
"out": {"success": False, "observed_value": -0.07638895580386174},
},
{
"title": "positive_test_negative_skew",
"exact_match_out": False,
"include_in_gallery": True,
"tolerance": 0.1,
"in": {"column": "c", "min_value": -10, "max_value": -0.5},
"out": {"success": True, "observed_value": -0.9979514313860596},
},
{
"title": "negative_test_abs_skew",
"exact_match_out": False,
"include_in_gallery": True,
"tolerance": 0.1,
"in": {
"column": "c",
"abs": True,
Expand All @@ -262,6 +342,7 @@ class ExpectColumnSkewToBeBetween(ColumnExpectation):
"title": "positive_test_abs_skew",
"exact_match_out": False,
"include_in_gallery": True,
"tolerance": 0.1,
"in": {
"column": "c",
"abs": True,
Expand All @@ -271,7 +352,17 @@ class ExpectColumnSkewToBeBetween(ColumnExpectation):
"out": {"success": True, "observed_value": 0.9979514313860596},
},
],
},
"test_backends": [
{
"backend": "pandas",
"dialects": None,
},
{
"backend": "sqlalchemy",
"dialects": ["mysql", "postgresql"],
},
],
}
]

# This dictionary contains metadata for display in the public gallery
Expand Down Expand Up @@ -401,4 +492,5 @@ def _validate(

if __name__ == "__main__":
self_check_report = ExpectColumnSkewToBeBetween().run_diagnostics()

print(json.dumps(self_check_report, indent=2))
1 change: 1 addition & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ title: Changelog
---

### Develop
* [FEATURE] Add "test_backends" key to Expectation.examples for specifying test backends and dialects (#3257)


### 0.13.31
Expand Down
1 change: 0 additions & 1 deletion great_expectations/dataset/sqlalchemy_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,6 @@ def __init__(
"sqlite",
"oracle",
"mssql",
"bigquery",
]:
# These are the officially included and supported dialects by sqlalchemy
self.dialect = import_library_module(
Expand Down
10 changes: 5 additions & 5 deletions great_expectations/expectations/expectation.py
Original file line number Diff line number Diff line change
Expand Up @@ -785,21 +785,21 @@ def run_diagnostics(self, pretty_print=True):
report_obj, e, traceback.format_exc()
)

execution_engines = None
introspected_execution_engines = None
if upstream_metrics is not None:
execution_engines = self._get_execution_engine_dict(
introspected_execution_engines = self._get_execution_engine_dict(
upstream_metrics=upstream_metrics,
)
report_obj.update({"execution_engines": execution_engines})
report_obj.update({"execution_engines": introspected_execution_engines})

try:
tests = self._get_examples(return_only_gallery_examples=False)
if len(tests) > 0:
if execution_engines is not None:
if introspected_execution_engines is not None:
test_results = self._get_test_results(
snake_name,
tests,
execution_engines,
introspected_execution_engines,
)
report_obj.update({"test_report": test_results})
except Exception as e:
Expand Down
71 changes: 51 additions & 20 deletions great_expectations/self_check/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -1361,6 +1361,7 @@ def build_test_backends_list(
include_pandas=True,
include_spark=True,
include_sqlalchemy=True,
include_sqlite=True,
include_postgresql=False,
include_mysql=False,
include_mssql=False,
Expand Down Expand Up @@ -1388,7 +1389,8 @@ def build_test_backends_list(
"sqlalchemy tests are requested, but sqlalchemy in not installed"
)

test_backends += ["sqlite"]
if include_sqlite:
test_backends += ["sqlite"]

if include_postgresql:
###
Expand Down Expand Up @@ -1471,28 +1473,56 @@ def generate_expectation_tests(
"""
parametrized_tests = []

# use the expectation_execution_engines_dict (if provided) to request only the appropriate backends
if expectation_execution_engines_dict is not None:
backends = build_test_backends_list(
include_pandas=expectation_execution_engines_dict.get(
"PandasExecutionEngine"
)
== True,
include_spark=expectation_execution_engines_dict.get(
"SparkDFExecutionEngine"
# If Expectation.examples defines "test_backends", use that to determine backends and dialects to use.
# Otherwise, use the introspected expectation_execution_engines_dict.
for d in examples_config:
d = copy.deepcopy(d)
if expectation_execution_engines_dict is not None:
example_backends_is_defined = "test_backends" in d
example_backends = [
backend_dict.get("backend")
for backend_dict in d.get("test_backends", [])
]
example_sqlalchemy_dialects = [
dialect
for backend_dict in d.get("test_backends", {})
if (backend_dict.get("backend") == "sqlalchemy")
for dialect in backend_dict.get("dialects", [])
]
include_sqlalchemy = (
("sqlalchemy" in example_backends)
if example_backends_is_defined
else (
expectation_execution_engines_dict.get("SqlAlchemyExecutionEngine")
== True
)
)
== True,
include_sqlalchemy=expectation_execution_engines_dict.get(
"SqlAlchemyExecutionEngine"
backends = build_test_backends_list(
include_pandas=("pandas" in example_backends)
if example_backends_is_defined
else (
expectation_execution_engines_dict.get("PandasExecutionEngine")
== True
),
include_spark=("spark" in example_backends)
if example_backends_is_defined
else (
expectation_execution_engines_dict.get("SparkDFExecutionEngine")
== True
),
include_sqlalchemy=include_sqlalchemy,
include_sqlite=("sqlite" in example_sqlalchemy_dialects)
if example_backends_is_defined
else include_sqlalchemy,
include_postgresql=("postgresql" in example_sqlalchemy_dialects),
include_mysql=("mysql" in example_sqlalchemy_dialects),
include_mssql=("mssql" in example_sqlalchemy_dialects),
)
== True,
)
else:
backends = build_test_backends_list()
else:
backends = build_test_backends_list()

for c in backends:

for c in backends:
for d in examples_config:
d = copy.deepcopy(d)
datasets = []
if candidate_test_is_on_temporary_notimplemented_list_cfe(
c, expectation_type
Expand Down Expand Up @@ -1856,6 +1886,7 @@ def check_json_test_result(test, result, data_asset=None):
# NOTE - 20191031 - JPC - we may eventually want to change these tests as we update our view on how
# representations, serializations, and objects should interact and how much of that is shown to the user.
result = result.to_json_dict()
print(result)
for key, value in test["out"].items():
# Apply our great expectations-specific test logic

Expand Down

0 comments on commit c1d6f60

Please sign in to comment.