Skip to content

Pipeline validation using Great Expectations library

Notifications You must be signed in to change notification settings

pfife00/stock-expectations

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

92 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Stock Expectations: Anomaly detection using Great Expectations

StockExpectations

Project Presentation

The project's presentation can be viewed here.

Project Description

There is an inherent belief in the stock mark that the price of stocks should fully reflect all available data. However, what if that data is flawed? What if there are anomalies in the data?
How do we go about finding these anomolies?

Great Expectations

To help answer this question, I employed the Great Expectations library which applies "expectations" to the data pipeline in order look for data which does not meet those expectations. An example expectation that is applied to the pipeline is provided below. sdf.expect_column_max_to_be_between("MAX_PRICE", 1, 500, result_format="BOOLEAN_ONLY")
This expectation applies the expecation to the data that the maxiumum price should not be greater than 500 or less than 1. If that expectation is not met, then the user is provided a flag (see dashboard image) to take appropriate action.

Screen Shot 2019-07-04 at 14 09 12

Engineering Challenge

Employing Great Expectations in a streaming environment proved to be a challenging task as it does not currently support a streaming environment. To overcome this challenge, several data processing steps within spark had to be performed in order utilize Great Expectations.

Tech Stack

S3, Kafka, Spark, Great Expectations, PostgreSQL, Dash

StockExpectations (1)

Datasets

The Deutsche Börse Public Dataset is a near real-time streaming stock data dataset stored in an external S3 bucket. The data dictionary for the data can be viewed in the dataset's Github repo.

Data Dictionary

The Xetra data is an S3 bucket stored at the following location:
s3://deutsche-boerse-xetra-pds

Each Xetra csv file within the bucket is defined as follows

  • ISIN ISIN of the security: string
  • Mnemonic Stock exchange ticker symbol: string
  • SecurityDesc Description of the security: string
  • SecurityType Type of security: string
  • Currency Currency in which the product is traded ISO 4217: string (see https://en.wikipedia.org/wiki/ISO_4217)
  • SecurityID Unique identifier for each contract: int
  • Date Date of trading period: date
  • Time Minute of trading to which this entry relates: time (hh:mm)
  • StartPrice Trading price at the start of period: float
  • MaxPrice Maximum price over the period: float
  • MinPrice Minimum price over the period: float
  • EndPrice Trading price at the end of the period: float
  • TradedVolume Total value traded: float
  • NumberOfTrades Number of distinct trades during the period: int
The Deutsche Börse Public Dataset (DBG PDS) S3 bucket is available from here.

Running the program

To run the program, please go to bigdatawork.xyz and click on the start button at the top left of of the page. After a few seconds, the program will start updating. To stop the program, please click the stop button.

Releases

No releases published

Packages

No packages published