Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better API parameter validation (3.3 branch) #1182

Merged
merged 2 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
86 changes: 62 additions & 24 deletions lektor/admin/modules/api.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import posixpath
from typing import Optional

import click
from flask import Blueprint
Expand All @@ -12,12 +13,39 @@
from lektor.constants import PRIMARY_ALT
from lektor.publisher import publish
from lektor.publisher import PublishError
from lektor.utils import cleanup_path
from lektor.utils import is_valid_id


bp = Blueprint("api", __name__, url_prefix="/admin/api")


class ValidationError(Exception):
"""Invalid input."""


@bp.errorhandler(ValidationError)
def validation_error_handler(error):
message = str(error)
return message, 400


def _validate_path(value: str) -> str:
path = cleanup_path(value)
if path != value:
raise ValidationError("Invalid path")
return path


def _validate_alt(value: Optional[str]) -> str:
if value is None:
return PRIMARY_ALT
lektor_config = g.admin_context.pad.config
if not lektor_config.is_valid_alternative(value):
raise ValidationError("Invalid alt")
return value


def get_record_and_parent(path):
pad = g.admin_context.pad
record = pad.get(path)
Expand All @@ -31,8 +59,9 @@ def get_record_and_parent(path):
@bp.route("/pathinfo")
def get_path_info():
"""Returns the path segment information for a record."""
alt = request.args.get("alt") or PRIMARY_ALT
tree_item = g.admin_context.tree.get(request.args["path"])
path = _validate_path(request.args["path"])
alt = _validate_alt(request.args.get("alt"))
tree_item = g.admin_context.tree.get(path)
segments = []

while tree_item is not None:
Expand All @@ -53,8 +82,8 @@ def get_path_info():

@bp.route("/recordinfo")
def get_record_info():
request_path = request.args["path"]
alt = request.args.get("alt") or PRIMARY_ALT
request_path = _validate_path(request.args["path"])
alt = _validate_alt(request.args.get("alt"))
tree_item = g.admin_context.tree.get(request_path)

return jsonify(
Expand Down Expand Up @@ -99,16 +128,17 @@ def get_record_info():

@bp.route("/previewinfo")
def get_preview_info():
alt = request.args.get("alt") or PRIMARY_ALT
record = g.admin_context.pad.get(request.args["path"], alt=alt)
path = _validate_path(request.args["path"])
alt = _validate_alt(request.args.get("alt"))
record = g.admin_context.pad.get(path, alt=alt)
if record is None:
return jsonify(exists=False, url=None, is_hidden=True)
return jsonify(exists=True, url=record.url_path, is_hidden=record.is_hidden)


@bp.route("/find", methods=["POST"])
def find():
alt = request.values.get("alt") or PRIMARY_ALT
alt = _validate_alt(request.args.get("alt"))
lang = request.values.get("lang") or g.admin_context.info.ui_lang
q = request.values.get("q")
builder = current_app.lektor_info.get_builder()
Expand All @@ -117,8 +147,9 @@ def find():

@bp.route("/browsefs", methods=["POST"])
def browsefs():
alt = request.values.get("alt") or PRIMARY_ALT
record = g.admin_context.pad.get(request.values["path"], alt=alt)
path = _validate_path(request.args["path"])
alt = _validate_alt(request.args.get("alt"))
record = g.admin_context.pad.get(path, alt=alt)
okay = False
if record is not None:
if record.is_attachment:
Expand All @@ -143,16 +174,18 @@ def match_url():

@bp.route("/rawrecord")
def get_raw_record():
alt = request.args.get("alt") or PRIMARY_ALT
ts = g.admin_context.tree.edit(request.args["path"], alt=alt)
path = _validate_path(request.args["path"])
alt = _validate_alt(request.args.get("alt"))
ts = g.admin_context.tree.edit(path, alt=alt)
return jsonify(ts.to_json())


@bp.route("/newrecord")
def get_new_record_info():
path = _validate_path(request.args["path"])
alt = _validate_alt(request.args.get("alt"))
pad = g.admin_context.pad
alt = request.args.get("alt") or PRIMARY_ALT
tree_item = g.admin_context.tree.get(request.args["path"])
tree_item = g.admin_context.tree.get(path)

def describe_model(model):
primary_field = None
Expand Down Expand Up @@ -186,8 +219,9 @@ def describe_model(model):

@bp.route("/newattachment")
def get_new_attachment_info():
alt = request.args.get("alt") or PRIMARY_ALT
tree_item = g.admin_context.tree.get(request.args["path"])
path = _validate_path(request.args["path"])
alt = _validate_alt(request.args.get("alt"))
tree_item = g.admin_context.tree.get(path)
label_i18n = tree_item.get_record_label_i18n(alt)
return jsonify(
{
Expand All @@ -200,8 +234,9 @@ def get_new_attachment_info():

@bp.route("/newattachment", methods=["POST"])
def upload_new_attachments():
alt = request.values.get("alt") or PRIMARY_ALT
ts = g.admin_context.tree.edit(request.values["path"], alt=alt)
path = _validate_path(request.values["path"])
alt = _validate_alt(request.values.get("alt"))
ts = g.admin_context.tree.edit(path, alt=alt)
if not ts.exists or ts.is_attachment:
return jsonify({"bad_upload": True})

Expand All @@ -227,13 +262,14 @@ def upload_new_attachments():
@bp.route("/newrecord", methods=["POST"])
def add_new_record():
values = request.get_json()
alt = values.get("alt") or PRIMARY_ALT
request_path = _validate_path(values["path"])
alt = _validate_alt(values.get("alt"))
exists = False

if not is_valid_id(values["id"]):
return jsonify(valid_id=False, exists=False, path=None)

path = posixpath.join(values["path"], values["id"])
path = posixpath.join(request_path, values["id"])

ts = g.admin_context.tree.edit(path, datamodel=values.get("model"), alt=alt)
with ts:
Expand All @@ -247,10 +283,11 @@ def add_new_record():

@bp.route("/deleterecord", methods=["POST"])
def delete_record():
alt = request.values.get("alt") or PRIMARY_ALT
path = _validate_path(request.values["path"])
alt = _validate_alt(request.values.get("alt"))
delete_master = request.values.get("delete_master") == "1"
if request.values["path"] != "/":
ts = g.admin_context.tree.edit(request.values["path"], alt=alt)
if path != "/":
ts = g.admin_context.tree.edit(path, alt=alt)
with ts:
ts.delete(delete_master=delete_master)
return jsonify(okay=True)
Expand All @@ -259,9 +296,10 @@ def delete_record():
@bp.route("/rawrecord", methods=["PUT"])
def update_raw_record():
values = request.get_json()
path = _validate_path(values["path"])
alt = _validate_alt(values.get("alt"))
data = values["data"]
alt = values.get("alt") or PRIMARY_ALT
ts = g.admin_context.tree.edit(values["path"], alt=alt)
ts = g.admin_context.tree.edit(path, alt=alt)
with ts:
ts.data.update(data)
return jsonify(path=ts.path)
Expand Down
18 changes: 18 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import os
from operator import itemgetter
from pathlib import Path

import pytest

Expand Down Expand Up @@ -86,6 +87,23 @@ def test_recordinfo_children_sort_limited_alts(project, env):
assert list(sorted(child_data, key=itemgetter("label"))) == child_data


def test_newrecord(scratch_project, scratch_env):
params = {"path": "/", "id": "new", "data": {}}
webadmin = WebAdmin(scratch_env, output_path=scratch_project.tree)
resp = webadmin.test_client().post("/admin/api/newrecord", json=params)
assert resp.status_code == 200
assert resp.get_json() == {"valid_id": True, "exists": False, "path": "/new"}
assert Path(scratch_project.tree, "content", "new", "contents.lr").exists()


def test_newrecord_bad_path(scratch_project, scratch_env):
params = {"path": "/../../templates", "id": "", "data": {}}
webadmin = WebAdmin(scratch_env, output_path=scratch_project.tree)
resp = webadmin.test_client().post("/admin/api/newrecord", json=params)
assert resp.status_code == 400
assert resp.get_data(as_text=True) == "Invalid path"


def test_eventstream_yield_bytes():
count = 0

Expand Down