Skip to content

aungkonazim/stream-processor

 
 

Repository files navigation

Stream Processor - A real-time high-frequency framework for mCerebrum's data sources

Build Status Codacy Badge

Overview

Stream Processor is a library and data processing tool that contains online implementations of algorithms designed to run on mCerebrum. This codebase also be run in a standalone fashion on most computing platforms.

It contains implementations of the following algorithms:

Algorithms

  • cStress: A continuous stress assessment algorithm

References

  • UbiComp 2015 cStress: Towards a Gold Standard for Continuous Stress Assessment in the Mobile Environment Karen Hovsepian, Mustafa al'absi, Emre Ertin, Thomas Kamarck, Motoshiro Nakajima, Santosh Kumar pdf

Install

Clone repository git clone https://github.com/MD2Korg/stream-processor

or import into Intellij IDEA through New->Project from Version Control->Github

  • Use this url https://github.com/MD2Korg/stream-processor
  • Check Use auto-import
  • Check Create directories ofr empyt content roots automatically
  • Choose Use gralde wrapper task configuration
  • Specify a Gradle JVM (jdk>=1.7)
  • Wait for Gradle to resolve dependencies and build project
  • Define the Project SDK and add the same JDK you are utilizing from the previous step
  • Open Edit Configurations
  • Add Application
    • Main class: CC_Main or Main
    • Program Arguments:
      • Directory to Cerebral Cortex data files
      • Path to cStressModelV5.json
      • Path to cStressModelRIPv4.json
      • Path to model_puffmarker.json
    • Specify classpath of module: streamm-processor_main

Usage

Import data for replay through Stream Processor

CSVParser tp = new CSVParser();
tp.importData(path + "/rip.txt", AUTOSENSE.CHEST_RIP);
tp.importData(path + "/ecg.txt", AUTOSENSE.CHEST_ECG);
tp.importData(path + "/accelx.txt", AUTOSENSE.CHEST_ACCEL_X);
tp.importData(path + "/accely.txt", AUTOSENSE.CHEST_ACCEL_Y);
tp.importData(path + "/accelz.txt", AUTOSENSE.CHEST_ACCEL_Z);

tp.sort();

Setup the Stream Processor object with a 60 second window, define a path for exporting data streams, and load the cStress model file.

int windowSize = 60000;

StreamProcessor streamProcessor = new StreamProcessor(windowSize);
streamProcessor.setPath(path);
streamProcessor.loadModel(cStressModelPath);

Define datapoint callbacks for DataPoint and DataPointArray.

streamProcessor.dpInterface = new DataPointInterface() {
    @Override
    public void dataPointHandler(String stream, DataPoint dp) {
        System.out.println(path + "/" + stream + " " + dp);
    }

    @Override
    public void dataPointArrayHandler(String stream, DataPointArray dp) {
        System.out.println(path + "/" + stream + " " + dp);
    }
};

Register callbacks for particular named data streams.

streamProcessor.registerCallbackDataArrayStream(StreamConstants.ORG_MD2K_CSTRESS_FV);
streamProcessor.registerCallbackDataStream(StreamConstants.ORG_MD2K_CSTRESS_DATA_ACCEL_ACTIVITY);
streamProcessor.registerCallbackDataStream(StreamConstants.ORG_MD2K_CSTRESS_PROBABILITY);
streamProcessor.registerCallbackDataStream(StreamConstants.ORG_MD2K_CSTRESS_STRESSLABEL);

Replay logic for sending data through Stream Processor and processing windows streamProcessor.go().

    long windowStartTime = -1;
    long st = -1;
    int count = 0;
    for (CSVDataPoint ap : tp) {
        DataPoint dp = new DataPoint(ap.timestamp, ap.value);

        if (windowStartTime < 0) {
            windowStartTime = Time.nextEpochTimestamp(dp.timestamp, windowSize);
            st = System.currentTimeMillis();
        }

        if ((dp.timestamp - windowStartTime) >= windowSize) { //Process the buffer every windowSize milliseconds
            streamProcessor.go();
            windowStartTime += windowSize;
        }

        streamProcessor.add(ap.channel, dp);

    }

Release History

  • 0.1.0 Initial release

Contributors

License

BSD 2-Clause

More information

Provide feedback or submit a bug report

http://docs.md2k.org/feedback

Support

MD2K is supported by the National Institutes of Health Big Data to Knowledge Initiative Grant #1U54EB020404

Team: Cornell Tech, GA Tech, U Memphis, Northwestern, Ohio State, Open mHealth, Rice, UCLA, UCSD, UCSF, U Mass, U Michigan, WVU

About

A real-time data stream processor designed around AutoSense algorithms for mCerebrum

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • Java 100.0%