Skip to content

Commit

Permalink
Merge pull request #177 from StochSS/develop
Browse files Browse the repository at this point in the history
v1.0.1 - #175 #176 #178
  • Loading branch information
briandrawert committed Feb 28, 2023
2 parents e36f2be + 12a6a7c commit 853447f
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 25 deletions.
22 changes: 22 additions & 0 deletions .cloud.dockerfile
@@ -0,0 +1,22 @@
FROM python:3.11.1-buster

LABEL authors="Ethan Green <egreen4@unca.edu>, Matthew Dippel <mdip226@gmail.com>"

# set up virtual environment inside container
ENV VIRTUAL_ENV=/opt/venv
RUN python3 -m venv $VIRTUAL_ENV
# activate the venv
ENV PYTHONPATH="$VIRTUAL_ENV:$PYTHONPATH"
ENV PATH="$VIRTUAL_ENV:$PATH"
# make the venv a volume
VOLUME [ "/opt/venv" ]

WORKDIR /usr/src/app

COPY stochss_compute /usr/src/app/stochss_compute
COPY requirements.txt *.py *.md *.dockerfile *.cfg /usr/src/app/
RUN pip install '.[AWS]'

EXPOSE 29681

CMD [ "stochss-compute-cluster"]
9 changes: 8 additions & 1 deletion .github/workflows/docker.yml
Expand Up @@ -29,5 +29,12 @@ jobs:
with:
push: true
file: .api.dockerfile
tags: stochss/stochss-compute:latest, stochss/stochss-compute:cloud
tags: stochss/stochss-compute:latest
-
name: Build and push to stochss org on DockerHub
uses: docker/build-push-action@v2
with:
push: true
file: .cloud.dockerfile
tags: stochss/stochss-compute:cloud
- run: echo "🍏 This job's status is ${{ job.status }}."
11 changes: 6 additions & 5 deletions .gitignore
Expand Up @@ -142,13 +142,14 @@ Thumbs.db #thumbnail cache on Windows
# profiling data
.prof


### vscode ###
.vscode/*
!.vscode/settings.json
!.vscode/tasks.json
!.vscode/launch.json
!.vscode/extensions.json
*.code-workspace

# StochSS-Compute
**/dask-worker-space
**/cache/

# Development team
**/.mdip/
bs*
8 changes: 4 additions & 4 deletions docs/tutorials/aws/aws.rst
Expand Up @@ -48,11 +48,11 @@ AWS Configuration

2. In order to make the AWS API calls to your account, you need an AWS access key and access key ID.

From the IAM dashboard, click 'Manage access keys'.
From the IAM dashboard, click 'My security credentials'.

Then, under the Access keys tab, click 'Create New Access Key'.
Then, under the Access keys tab, click 'Create Access Key'.

This file can only be downloaded once, but if something happens you can just make a new one.
Make sure to record the Access and Secrety keys. We recommend you download the CSV file now, as it can only be downloaded once. if you lose these keys you can just make a new ones.

This file contains the Access Key ID and a Secret Access Key.

Expand All @@ -75,4 +75,4 @@ AWS Configuration
.. code-block:: python
from dotenv import load_dotenv
load_dotenv() # Loads from a file named .env by default
load_dotenv() # Loads from a file named .env by default
2 changes: 1 addition & 1 deletion stochss_compute/__version__.py
Expand Up @@ -26,7 +26,7 @@
# =============================================================================


__version__ = '1.0.0'
__version__ = '1.0.1'

__title__ = "stochss-compute"
__description__ = "A compute delegation package for the StochSS family of stochastic simulators"
Expand Down
38 changes: 24 additions & 14 deletions stochss_compute/server/status.py
Expand Up @@ -22,18 +22,26 @@
from tornado.web import RequestHandler
from stochss_compute.core.errors import RemoteSimulationError
from stochss_compute.core.messages import SimStatus, StatusResponse

from stochss_compute.server.cache import Cache

class StatusHandler(RequestHandler):
'''
Endpoint for requesting the status of a simulation.
'''

def __init__(self, application, request, **kwargs):
self.scheduler_address = None
self.cache_dir = None
self.task_id = None
self.results_id = None
super().__init__(application, request, **kwargs)

def data_received(self, chunk: bytes):
raise NotImplementedError()

def initialize(self, scheduler_address, cache_dir):
'''
Sets the address to the Dask scheduler and the cache directory.
:param scheduler_address: Scheduler address.
:type scheduler_address: str
Expand All @@ -52,30 +60,34 @@ async def get(self, results_id, n_traj, task_id):
:param n_traj: Number of trajectories in the request. Default 1.
:type n_traj: str
:param task_id: ID of the running simulation. Required.
:type task_id: str
'''
if '' in (results_id, n_traj):
self.set_status(404, reason=f'Malformed request: {self.request.uri}')
self.finish()
raise RemoteSimulationError(f'Malformed request: {self.request.uri}')

self.results_id = results_id
self.task_id = task_id
n_traj = int(n_traj)

cache = Cache(self.cache_dir, results_id)

print(f'{datetime.now()} | <{self.request.remote_ip}> | \
Status Request | <{results_id}> | Trajectories: {n_traj} | \
Task ID: {task_id}' )
msg = f'{datetime.now()} | <{results_id}> | <{task_id}> |Status: '
Status Request | <{results_id}> | Trajectories: {n_traj} | \
Task ID: {task_id}' )

msg = f'{datetime.now()} | <{results_id}> | <{task_id}> | Status: '

exists = cache.exists()
if exists:
empty = cache.is_empty()
if empty:
if self.task_id not in ('', None):
state, err = await self.check_with_scheduler()

print(msg+SimStatus.RUNNING.name+f' | Task: {state} | error: {err}')
state, err = await self._check_with_scheduler()
print(msg + SimStatus.RUNNING.name + f' | Task: {state} | Error: {err}')
if state == 'erred':
self._respond_error(err)
else:
Expand All @@ -90,7 +102,7 @@ async def get(self, results_id, n_traj, task_id):
self._respond_ready()
else:
if self.task_id not in ('', None):
state, err = await self.check_with_scheduler()
state, err = await self._check_with_scheduler()
print(msg+SimStatus.RUNNING.name+f' | Task: {state} | error: {err}')
if state == 'erred':
self._respond_error(err)
Expand All @@ -103,7 +115,6 @@ async def get(self, results_id, n_traj, task_id):
print(msg+SimStatus.DOES_NOT_EXIST.name)
self._respond_dne()


def _respond_ready(self):
status_response = StatusResponse(SimStatus.READY)
self.write(status_response.encode())
Expand Down Expand Up @@ -133,14 +144,13 @@ async def _check_with_scheduler(self):
# define function here so that it is pickle-able
def scheduler_task_state(task_id, dask_scheduler=None):
task = dask_scheduler.tasks.get(task_id)

if task is None:
return (None, None)
if task.exception_text == "":
return (task.state, None)
return (task.state, task.exception_text)

# Do not await. Reasons. It returns sync.
ret = client.run_on_scheduler(scheduler_task_state, self.task_id)
client.close()
return ret

0 comments on commit 853447f

Please sign in to comment.