Skip to content

Commit

Permalink
fix ray tests and coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
goodwanghan committed Dec 28, 2022
1 parent 3412839 commit 49d3724
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 6 deletions.
6 changes: 3 additions & 3 deletions fugue/column/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ def where(self, condition: ColumnExpr, table: str) -> Iterable[Tuple[bool, str]]
lambda: ValueError(f"{condition} has aggregation functions"),
)
cond = self.generate(condition.alias(""))
yield (False, "SELECT * FROM ")
yield (False, "SELECT * FROM")
yield (True, table)
yield (False, f"WHERE {cond}")

Expand Down Expand Up @@ -312,14 +312,14 @@ def _having(as_where: bool = False) -> str:
if len(columns.literals) == 0:
expr = ", ".join(self.generate(x) for x in columns.all_cols)
if len(columns.group_keys) == 0:
yield (False, f"SELECT {distinct}{expr} FROM ")
yield (False, f"SELECT {distinct}{expr} FROM")
yield (True, table)
yield (False, _where())
yield (False, _having())
return
else:
keys = ", ".join(self.generate(x) for x in columns.group_keys)
yield (False, f"SELECT {distinct}{expr} FROM ")
yield (False, f"SELECT {distinct}{expr} FROM")
yield (True, table)
yield (False, _where())
yield (False, f"GROUP BY {keys}")
Expand Down
2 changes: 1 addition & 1 deletion fugue/workflow/input.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Type


def register_raw_df_type(df_type: Type) -> None:
def register_raw_df_type(df_type: Type) -> None: # pragma: no cover
"""TODO: This function is to be removed before 0.9.0
.. deprecated:: 3.1
Expand Down
2 changes: 1 addition & 1 deletion fugue_ray/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def __init__( # noqa: C901
schema = df.schema
metadata = None if not df.has_metadata else df.metadata
else:
raise ValueError(f"{df} is incompatible with DaskDataFrame")
raise ValueError(f"{df} is incompatible with RayDataFrame")
rdf, schema = self._apply_schema(rdf, schema, internal_schema)
super().__init__(schema)
self._native = rdf
Expand Down
8 changes: 7 additions & 1 deletion fugue_ray/execution_engine.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Any, Callable, Dict, List, Optional, Union

import pyarrow as pa
from duckdb import DuckDBPyConnection
from duckdb import DuckDBPyConnection, DuckDBPyRelation
from triad import Schema, assert_or_throw, to_uuid
from triad.utils.threading import RunOnce

Expand Down Expand Up @@ -269,6 +269,12 @@ def _to_auto_df(self, df: Any, schema: Any = None) -> DataFrame:
ValueError("schema must be None when df is a DataFrame"),
)
return df
if isinstance(df, DuckDBPyRelation):
assert_or_throw(
schema is None,
ValueError("schema must be None when df is a DuckDBPyRelation"),
)
return DuckDataFrame(df)
return RayDataFrame(df, schema)

def _get_remote_args(self) -> Dict[str, Any]:
Expand Down

0 comments on commit 49d3724

Please sign in to comment.