Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Zenodo remote provider for transparent storage on and retrieval from Zenodo #1455

Merged
merged 2 commits into from Mar 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/main.yml
Expand Up @@ -91,6 +91,7 @@ jobs:
- name: Run tests
env:
CI: true
ZENODO_SANDBOX_PAT: ${{ secrets.ZENODO_SANDBOX_PAT }}
run: |
# activate conda env
export PATH="/usr/share/miniconda/bin:$PATH"
Expand Down Expand Up @@ -195,5 +196,6 @@ jobs:
- name: Run tests
env:
CI: true
ZENODO_SANDBOX_PAT: ${{ secrets.ZENODO_SANDBOX_PAT }}
run: |
python -m pytest -v -x tests/tests.py
36 changes: 35 additions & 1 deletion docs/snakefiles/remote_files.rst
Expand Up @@ -798,8 +798,42 @@ Note that the filename should not include the ``.cip`` ending that is sometimes

Upon download, Snakemake will automatically decrypt the file and check the MD5 hash.

Zenodo
======

`Zenodo <https://zenodo.org>`_ is a catch-all open data and software repository.
Snakemake allows file upload and download from Zenodo.
To access your Zenodo files you need to set up Zenodo account and create a personal access token with at least write scope.
Personal access token must be supplied as ``access_token`` argument.
You need to supply deposition id as ``deposition`` to upload or download files from your deposition.
If no deposition id is supplied, Snakemake creates a new deposition for upload.
Zenodo UI and REST API responses were designed with having in mind uploads of a total of 20-30 files.
Avoid creating uploads with too many files, and instead group and zip them to make it easier their distribution to end-users.

.. code-block:: python
from snakemake.remote.zenodo import RemoteProvider
import os

# let Snakemake assert the presence of the required environment variable
envvars:
"MYZENODO_PAT"

access_token=os.environ["MYZENODO_PAT"]
zenodo = RemoteProvider(deposition="your deposition id", access_token=access_token)

rule upload:
input:
"output/results.csv"
output:
zenodo.remote("results.csv")
shell:
"cp {input} {output}"


It is possible to use `Zenodo sandbox environment <https://sandbox.zenodo.org>`_ for testing by setting ``sandbox=True`` argument.
Using sandbox environment requires setting up sandbox account with its personal access token.

AUTO
Auto
====

A wrapper which automatically selects an appropriate remote provider based on the url's scheme.
Expand Down
Binary file added fg.pdf
Binary file not shown.
5 changes: 5 additions & 0 deletions snakemake/exceptions.py
Expand Up @@ -466,6 +466,11 @@ def __init__(self, msg, lineno=None, snakefile=None):
super().__init__(msg, lineno=lineno, snakefile=snakefile)


class ZenodoFileException(RuleException):
def __init__(self, msg, lineno=None, snakefile=None):
super().__init__(msg, lineno=lineno, snakefile=snakefile)


class ClusterJobException(RuleException):
def __init__(self, job_info, jobid):
super().__init__(
Expand Down
191 changes: 191 additions & 0 deletions snakemake/remote/zenodo.py
@@ -0,0 +1,191 @@
__author__ = "Taavi Päll"
__copyright__ = "Copyright 2019, Taavi Päll"
__email__ = "tapa741@gmail.com"
__license__ = "MIT"

import os
import hashlib
from collections import namedtuple
import requests
from requests.exceptions import HTTPError
from snakemake.remote import (
AbstractRemoteObject,
AbstractRemoteProvider,
AbstractRemoteRetryObject,
)
from snakemake.exceptions import ZenodoFileException, WorkflowError
from snakemake.common import lazy_property


ZenFileInfo = namedtuple("ZenFileInfo", ["checksum", "filesize", "id", "download"])


class RemoteProvider(AbstractRemoteProvider):
def __init__(self, *args, stay_on_remote=False, **kwargs):
super(RemoteProvider, self).__init__(
*args, stay_on_remote=stay_on_remote, **kwargs
)
self._zen = ZENHelper(*args, **kwargs)

def remote_interface(self):
return self._zen

@property
def default_protocol(self):
return "https://"

@property
def available_protocols(self):
return ["https://"]


class RemoteObject(AbstractRemoteRetryObject):
def __init__(
self, *args, keep_local=False, stay_on_remote=False, provider=None, **kwargs
):
super(RemoteObject, self).__init__(
*args,
keep_local=keep_local,
stay_on_remote=stay_on_remote,
provider=provider,
**kwargs
)
if provider:
self._zen = provider.remote_interface()
else:
self._zen = ZENHelper(*args, **kwargs)

# === Implementations of abstract class members ===
def _stats(self):
return self._zen.get_files()[os.path.basename(self.local_file())]

def exists(self):
return os.path.basename(self.local_file()) in self._zen.get_files()

def size(self):
if self.exists():
return self._stats().filesize
else:
return self._iofile.size_local

def mtime(self):
# There is no mtime info provided by Zenodo.
# Hence, the files are always considered to be "ancient".
return 0

def _download(self):
stats = self._stats()
download_url = stats.download
r = self._zen._api_request(download_url)

local_md5 = hashlib.md5()

# Download file.
with open(self.local_file(), "wb") as rf:
for chunk in r.iter_content(chunk_size=1024 * 1024 * 10):
local_md5.update(chunk)
rf.write(chunk)
local_md5 = local_md5.hexdigest()

if local_md5 != stats.checksum:
raise ZenodoFileException(
"File checksums do not match for remote file id: {}".format(stats.id)
)

def _upload(self):
with open(self.local_file(), "rb") as lf:
self._zen._api_request(
self._zen.bucket + "/{}".format(os.path.basename(self.remote_file())),
method="PUT",
data=lf,
)

@property
def list(self):
return [i for i in self._zen.get_files()]

@property
def name(self):
return self.local_file()


class ZENHelper(object):
def __init__(self, *args, **kwargs):

try:
self._access_token = kwargs.pop("access_token")
except KeyError:
raise WorkflowError(
"Zenodo personal access token must be passed in as 'access_token' argument.\n"
"Separate registration and access token is needed for Zenodo sandbox "
"environment at https://sandbox.zenodo.org."
)

if "sandbox" in kwargs:
self._sandbox = kwargs.pop("sandbox")
else:
self._sandbox = False

if self._sandbox:
self._baseurl = "https://sandbox.zenodo.org"
else:
self._baseurl = "https://zenodo.org"

if "deposition" in kwargs:
self.deposition = kwargs.pop("deposition")
self.bucket = self.get_bucket()
else:
# Creating a new deposition, as deposition id was not supplied.
self.deposition, self.bucket = self.create_deposition().values()

def _api_request(
self, url, method="GET", data=None, headers={}, files=None, json=False
):

# Create a session with a hook to raise error on bad request.
session = requests.Session()
session.hooks = {"response": lambda r, *args, **kwargs: r.raise_for_status()}
session.headers["Authorization"] = "Bearer {}".format(self._access_token)
session.headers.update(headers)

# Run query.
try:
r = session.request(method=method, url=url, data=data, files=files)
if json:
msg = r.json()
return msg
else:
return r
except HTTPError as e:
raise WorkflowError("Failed to connect to zenodo", e)

def create_deposition(self):
resp = self._api_request(
method="POST",
url=self._baseurl + "/api/deposit/depositions",
headers={"Content-Type": "application/json"},
data="{}",
json=True,
)
return {"id": resp["id"], "bucket": resp["links"]["bucket"]}

def get_bucket(self):
resp = self._api_request(
self._baseurl + "/api/deposit/depositions/{}".format(self.deposition),
headers={"Content-Type": "application/json"},
json=True,
)
return resp["links"]["bucket"]

def get_files(self):
files = self._api_request(
self._baseurl + "/api/deposit/depositions/{}/files".format(self.deposition),
headers={"Content-Type": "application/json"},
json=True,
)
return {
os.path.basename(f["filename"]): ZenFileInfo(
f["checksum"], int(f["filesize"]), f["id"], f["links"]["download"]
)
for f in files
}
8 changes: 8 additions & 0 deletions tests/common.py
Expand Up @@ -54,6 +54,10 @@ def has_gcloud_service_key():
return "GCP_AVAILABLE" in os.environ


def has_zenodo_token():
return "ZENODO_SANDBOX_PAT" in os.environ


gcloud = pytest.mark.skipif(
not is_connected() or not has_gcloud_service_key(),
reason="Skipping GCLOUD tests because not on "
Expand All @@ -66,6 +70,10 @@ def has_gcloud_service_key():
ci = pytest.mark.skipif(not is_ci(), reason="not in CI")
not_ci = pytest.mark.skipif(is_ci(), reason="skipped in CI")

zenodo = pytest.mark.skipid(
not has_zenodo_token(), reason="no ZENODO_SANDBOX_PAT provided"
)


def copy(src, dst):
if os.path.isdir(src):
Expand Down
30 changes: 30 additions & 0 deletions tests/test_remote_zenodo/Snakefile
@@ -0,0 +1,30 @@
import os
from snakemake.remote.zenodo import RemoteProvider

access_token_sandbox=os.environ["ZENODO_SANDBOX_PAT"]
zen_sandbox = RemoteProvider(access_token=access_token_sandbox, sandbox=True)

rule all:
input: "download.txt", zen_sandbox.remote("large_upload.txt")

rule download:
input:
zen_sandbox.remote("uploaded.txt")
output:
"download.txt"
shell:
"cp {input} {output}"

rule upload:
input: "test.txt"
output:
zen_sandbox.remote("uploaded.txt")
shell:
"cp {input} {output}"

try:
rule too_large_upload:
output: zen_sandbox.remote("large_upload.txt")
shell: "head -c 101000000 /dev/urandom > {output}"
except ZenodoFileException:
print("Current Zenodo stable API supports <=100MB per file.")
3 changes: 3 additions & 0 deletions tests/test_remote_zenodo/expected-results/download.txt
@@ -0,0 +1,3 @@
Freedom of self-doubt
6 p.m.
Rising
3 changes: 3 additions & 0 deletions tests/test_remote_zenodo/test.txt
@@ -0,0 +1,3 @@
Freedom of self-doubt
6 p.m.
Rising
6 changes: 6 additions & 0 deletions tests/tests.py
Expand Up @@ -1201,6 +1201,12 @@ def test_output_file_cache_remote():
)


@connected
@zenodo
def test_remote_zenodo():
run(dpath("test_remote_zenodo"))


def test_multiext():
run(dpath("test_multiext"))

Expand Down