Skip to content

Commit

Permalink
Chore: Make release 1.0.58
Browse files Browse the repository at this point in the history
  • Loading branch information
martinroberson committed Feb 15, 2024
1 parent 22a5806 commit 8daf86a
Show file tree
Hide file tree
Showing 11 changed files with 1,547 additions and 63 deletions.
68 changes: 60 additions & 8 deletions gs_quant/api/gs/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,19 @@
under the License.
"""
import datetime as dt
import json
import logging
import time
from copy import copy
from enum import Enum
from itertools import chain
from typing import Iterable, List, Optional, Tuple, Union, Dict

import cachetools
import pandas as pd
from cachetools import TTLCache
import json
from dateutil import parser

from gs_quant.api.data import DataApi
from gs_quant.base import Base
from gs_quant.data.core import DataContext, DataFrequency
Expand All @@ -35,7 +38,6 @@
from gs_quant.target.coordinates import MDAPIDataBatchResponse, MDAPIDataQuery, MDAPIDataQueryResponse, MDAPIQueryField
from gs_quant.target.data import DataQuery, DataQueryResponse, DataSetCatalogEntry
from gs_quant.target.data import DataSetEntity, DataSetFieldEntity
from dateutil import parser
from .assets import GsIdType
from ..api_cache import ApiRequestCache
from ...target.assets import EntityQuery, FieldFilterMap
Expand Down Expand Up @@ -548,12 +550,12 @@ def get_mxapi_backtest_data(cls, builder, start_time=None, end_time=None, num_sa
return df

@staticmethod
def build_market_data_query(asset_ids: List[str],
query_type: Union[QueryType, str],
where: Union[FieldFilterMap, Dict] = None,
source: Union[str] = None,
real_time: bool = False,
measure='Curve'):
def _get_market_data_filters(asset_ids: List[str],
query_type: Union[QueryType, str],
where: Union[FieldFilterMap, Dict] = None,
source: Union[str] = None,
real_time: bool = False,
measure='Curve'):
inner = {
'entityIds': asset_ids,
'queryType': query_type.value if isinstance(query_type, QueryType) else query_type,
Expand All @@ -564,6 +566,56 @@ def build_market_data_query(asset_ids: List[str],
measure
]
}
return inner

@staticmethod
def build_interval_chunked_market_data_queries(asset_ids: List[str],
query_type: Union[QueryType, str],
where: Union[FieldFilterMap, Dict] = None,
source: Union[str] = None,
real_time: bool = False,
measure='Curve',
parallel_pool_size: int = 1) -> List[dict]:
def chunk_time(start, end, pool_size) -> tuple:
chunk_duration = (end - start) / pool_size
current = start
for _ in range(pool_size):
next_end = current + chunk_duration
yield current, next_end
current = next_end

queries = []
if real_time:
start, end = DataContext.current.start_time, DataContext.current.end_time
start_key, end_key = 'startTime', 'endTime'
else:
start, end = DataContext.current.start_date, DataContext.current.end_date
start_key, end_key = 'startDate', 'endDate'

for s, e in chunk_time(start, end, parallel_pool_size):
inner = copy(GsDataApi._get_market_data_filters(asset_ids, query_type, where, source, real_time, measure))
inner[start_key], inner[end_key] = s, e
queries.append({
'queries': [inner]
})

log_debug("", _logger, f"Created {len(queries)} market data queries. Pool size = {parallel_pool_size}")

return queries

@staticmethod
def build_market_data_query(asset_ids: List[str],
query_type: Union[QueryType, str],
where: Union[FieldFilterMap, Dict] = None,
source: Union[str] = None,
real_time: bool = False,
measure='Curve',
parallel_pool_size: int = 1) -> Union[dict, List[dict]]:
if parallel_pool_size > 1:
return GsDataApi.build_interval_chunked_market_data_queries(asset_ids, query_type, where, source, real_time,
measure, parallel_pool_size)

inner = GsDataApi._get_market_data_filters(asset_ids, query_type, where, source, real_time, measure)
if DataContext.current.interval is not None:
inner['interval'] = DataContext.current.interval
if real_time:
Expand Down
41 changes: 25 additions & 16 deletions gs_quant/api/gs/secmaster.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
from gs_quant.session import GsSession
from gs_quant.target.secmaster import SecMasterAssetType

DEFAULT_EFFECTIVE_DATE = dt.date(2100, 1, 1)
DEFAULT_SCROLL_PAGE_SIZE = 500


Expand Down Expand Up @@ -105,7 +104,7 @@ def get_security(cls, id_value: str,

@classmethod
def get_many_securities(cls, type_: SecMasterAssetType = None,
effective_date: dt.date = DEFAULT_EFFECTIVE_DATE,
effective_date: dt.date = None,
limit: int = 10, flatten=False,
is_primary=None,
offset_key: str = None,
Expand All @@ -127,11 +126,11 @@ def get_many_securities(cls, type_: SecMasterAssetType = None,
raise ValueError("Neither '_type' nor 'query_params' are provided")

params = {
'effectiveDate': effective_date,
"limit": limit
}

cls.prepare_params(params, is_primary, offset_key, type_)
cls.prepare_params(params, is_primary, offset_key, type_, effective_date)

params = {**params, **query_params}
payload = json.loads(json.dumps(params, cls=JSONEncoder))

Expand All @@ -145,7 +144,7 @@ def get_many_securities(cls, type_: SecMasterAssetType = None,

@classmethod
def get_all_securities(cls, type_: SecMasterAssetType,
effective_date: dt.date = DEFAULT_EFFECTIVE_DATE,
effective_date: dt.date = None,
is_primary=None,
flatten=False, **query_params) -> \
Union[Iterable[dict], None]:
Expand All @@ -159,7 +158,13 @@ def get_all_securities(cls, type_: SecMasterAssetType,
@param effective_date: As of date for query
@return:" list of dict
"""
response = cls.get_many_securities(type_, effective_date, limit=DEFAULT_SCROLL_PAGE_SIZE, offset_key=None,
if 'limit' in query_params:
limit = query_params['limit']
del query_params['limit']
else:
limit = DEFAULT_SCROLL_PAGE_SIZE

response = cls.get_many_securities(type_, effective_date, limit=limit, offset_key=None,
flatten=flatten, is_primary=is_primary,
**query_params)
if response is None or "offsetKey" not in response:
Expand All @@ -170,8 +175,9 @@ def get_all_securities(cls, type_: SecMasterAssetType,

results = response["results"]
offset_key = response["offsetKey"]

fn = partial(cls.get_many_securities, type_=type_, effective_date=effective_date,
limit=DEFAULT_SCROLL_PAGE_SIZE, flatten=flatten,
limit=limit, flatten=flatten,
**query_params)
results.extend(cls.__fetch_all(fn, offset_key))
response["totalResults"] = len(results)
Expand All @@ -182,7 +188,7 @@ def get_all_securities(cls, type_: SecMasterAssetType,
@classmethod
def get_security_data(cls, id_value: str,
id_type: SecMasterIdentifiers,
effective_date: dt.date = DEFAULT_EFFECTIVE_DATE) -> Union[dict, None]:
effective_date: dt.date = None) -> Union[dict, None]:
"""
Get flatten asset reference data
Expand Down Expand Up @@ -337,7 +343,7 @@ def get_corporate_actions(cls, id_value: str, id_type: SecMasterIdentifiers = Se
def get_capital_structure(cls, id_value: str,
id_type: CapitalStructureIdentifiers,
type_: SecMasterAssetType = None, is_primary: bool = None,
effective_date: dt.date = DEFAULT_EFFECTIVE_DATE) -> dict:
effective_date: dt.date = None) -> dict:
"""
Get a capital structure of the given company by id_value of the security.
It runs in batches till all data is fetched
Expand Down Expand Up @@ -388,22 +394,23 @@ def __capital_structure_aggregate(cls, asset_types_total, results):
def _get_capital_structure(cls, id_value: str, id_type: Union[CapitalStructureIdentifiers, SecMasterIdentifiers],
type_, is_primary, effective_date, offset_key: Union[str, None]):
params = {
id_type.value: id_value,
"effectiveDate": effective_date
id_type.value: id_value
}
cls.prepare_params(params, is_primary, offset_key, type_)
cls.prepare_params(params, is_primary, offset_key, type_, effective_date)
payload = json.loads(json.dumps(params, cls=JSONEncoder))
r = GsSession.current._get("/markets/capitalstructure", payload=payload)
return r

@classmethod
def prepare_params(cls, params, is_primary, offset_key, type_):
def prepare_params(cls, params, is_primary, offset_key, type_, effective_date=None):
if type_ is not None:
params["type"] = type_.value
if is_primary is not None:
params["isPrimary"] = is_primary
if offset_key is not None:
params["offsetKey"] = offset_key
if effective_date is not None:
params["effectiveDate"] = effective_date

@classmethod
def get_deltas(cls, start_time: dt.datetime = None, end_time: dt.datetime = None, raw: bool = None) -> \
Expand All @@ -429,7 +436,7 @@ def get_deltas(cls, start_time: dt.datetime = None, end_time: dt.datetime = None
return r

@classmethod
def get_exchanges(cls, effective_date: dt.date = DEFAULT_EFFECTIVE_DATE,
def get_exchanges(cls, effective_date: dt.date = None,
**query_params: Dict[str, Union[str, Iterable[str]]]):
"""
Returns reference data for exchanges - e.g. MICs, exchange codes, name, listing country.
Expand All @@ -452,7 +459,7 @@ def get_exchanges(cls, effective_date: dt.date = DEFAULT_EFFECTIVE_DATE,
return response

@classmethod
def _get_exchanges(cls, effective_date: dt.date = DEFAULT_EFFECTIVE_DATE, limit: int = 10,
def _get_exchanges(cls, effective_date: dt.date = None, limit: int = 10,
query_params=None,
offset_key: Union[str, None] = None):

Expand All @@ -463,9 +470,11 @@ def _get_exchanges(cls, effective_date: dt.date = DEFAULT_EFFECTIVE_DATE, limit:
if qp not in allowed_keys:
raise ValueError(f" Parameter '{qp}' is not supported. Allowed parameters: {allowed_keys}")
params = {
"effectiveDate": effective_date,
"limit": limit
}
if effective_date is not None:
params['effectiveDate'] = effective_date

params = {**params, **query_params}
if offset_key is not None:
params["offsetKey"] = offset_key
Expand Down

0 comments on commit 8daf86a

Please sign in to comment.