From 45e97471d43b98cda4d91f7916533bab11cdf90f Mon Sep 17 00:00:00 2001 From: Jeff Dairiki Date: Tue, 27 Feb 2024 10:27:44 -0800 Subject: [PATCH] Better API parameter validation (3.3 branch) (#1182) * test: new record creation confined to `content` tree * fix: better validation of API parameters --- lektor/admin/modules/api.py | 86 ++++++++++++++++++++++++++----------- tests/test_api.py | 18 ++++++++ 2 files changed, 80 insertions(+), 24 deletions(-) diff --git a/lektor/admin/modules/api.py b/lektor/admin/modules/api.py index 0ee909975..ecdee0736 100644 --- a/lektor/admin/modules/api.py +++ b/lektor/admin/modules/api.py @@ -1,5 +1,6 @@ import os import posixpath +from typing import Optional import click from flask import Blueprint @@ -12,12 +13,39 @@ from lektor.constants import PRIMARY_ALT from lektor.publisher import publish from lektor.publisher import PublishError +from lektor.utils import cleanup_path from lektor.utils import is_valid_id bp = Blueprint("api", __name__, url_prefix="/admin/api") +class ValidationError(Exception): + """Invalid input.""" + + +@bp.errorhandler(ValidationError) +def validation_error_handler(error): + message = str(error) + return message, 400 + + +def _validate_path(value: str) -> str: + path = cleanup_path(value) + if path != value: + raise ValidationError("Invalid path") + return path + + +def _validate_alt(value: Optional[str]) -> str: + if value is None: + return PRIMARY_ALT + lektor_config = g.admin_context.pad.config + if not lektor_config.is_valid_alternative(value): + raise ValidationError("Invalid alt") + return value + + def get_record_and_parent(path): pad = g.admin_context.pad record = pad.get(path) @@ -31,8 +59,9 @@ def get_record_and_parent(path): @bp.route("/pathinfo") def get_path_info(): """Returns the path segment information for a record.""" - alt = request.args.get("alt") or PRIMARY_ALT - tree_item = g.admin_context.tree.get(request.args["path"]) + path = _validate_path(request.args["path"]) + alt = _validate_alt(request.args.get("alt")) + tree_item = g.admin_context.tree.get(path) segments = [] while tree_item is not None: @@ -53,8 +82,8 @@ def get_path_info(): @bp.route("/recordinfo") def get_record_info(): - request_path = request.args["path"] - alt = request.args.get("alt") or PRIMARY_ALT + request_path = _validate_path(request.args["path"]) + alt = _validate_alt(request.args.get("alt")) tree_item = g.admin_context.tree.get(request_path) return jsonify( @@ -99,8 +128,9 @@ def get_record_info(): @bp.route("/previewinfo") def get_preview_info(): - alt = request.args.get("alt") or PRIMARY_ALT - record = g.admin_context.pad.get(request.args["path"], alt=alt) + path = _validate_path(request.args["path"]) + alt = _validate_alt(request.args.get("alt")) + record = g.admin_context.pad.get(path, alt=alt) if record is None: return jsonify(exists=False, url=None, is_hidden=True) return jsonify(exists=True, url=record.url_path, is_hidden=record.is_hidden) @@ -108,7 +138,7 @@ def get_preview_info(): @bp.route("/find", methods=["POST"]) def find(): - alt = request.values.get("alt") or PRIMARY_ALT + alt = _validate_alt(request.args.get("alt")) lang = request.values.get("lang") or g.admin_context.info.ui_lang q = request.values.get("q") builder = current_app.lektor_info.get_builder() @@ -117,8 +147,9 @@ def find(): @bp.route("/browsefs", methods=["POST"]) def browsefs(): - alt = request.values.get("alt") or PRIMARY_ALT - record = g.admin_context.pad.get(request.values["path"], alt=alt) + path = _validate_path(request.args["path"]) + alt = _validate_alt(request.args.get("alt")) + record = g.admin_context.pad.get(path, alt=alt) okay = False if record is not None: if record.is_attachment: @@ -143,16 +174,18 @@ def match_url(): @bp.route("/rawrecord") def get_raw_record(): - alt = request.args.get("alt") or PRIMARY_ALT - ts = g.admin_context.tree.edit(request.args["path"], alt=alt) + path = _validate_path(request.args["path"]) + alt = _validate_alt(request.args.get("alt")) + ts = g.admin_context.tree.edit(path, alt=alt) return jsonify(ts.to_json()) @bp.route("/newrecord") def get_new_record_info(): + path = _validate_path(request.args["path"]) + alt = _validate_alt(request.args.get("alt")) pad = g.admin_context.pad - alt = request.args.get("alt") or PRIMARY_ALT - tree_item = g.admin_context.tree.get(request.args["path"]) + tree_item = g.admin_context.tree.get(path) def describe_model(model): primary_field = None @@ -186,8 +219,9 @@ def describe_model(model): @bp.route("/newattachment") def get_new_attachment_info(): - alt = request.args.get("alt") or PRIMARY_ALT - tree_item = g.admin_context.tree.get(request.args["path"]) + path = _validate_path(request.args["path"]) + alt = _validate_alt(request.args.get("alt")) + tree_item = g.admin_context.tree.get(path) label_i18n = tree_item.get_record_label_i18n(alt) return jsonify( { @@ -200,8 +234,9 @@ def get_new_attachment_info(): @bp.route("/newattachment", methods=["POST"]) def upload_new_attachments(): - alt = request.values.get("alt") or PRIMARY_ALT - ts = g.admin_context.tree.edit(request.values["path"], alt=alt) + path = _validate_path(request.values["path"]) + alt = _validate_alt(request.values.get("alt")) + ts = g.admin_context.tree.edit(path, alt=alt) if not ts.exists or ts.is_attachment: return jsonify({"bad_upload": True}) @@ -227,13 +262,14 @@ def upload_new_attachments(): @bp.route("/newrecord", methods=["POST"]) def add_new_record(): values = request.get_json() - alt = values.get("alt") or PRIMARY_ALT + request_path = _validate_path(values["path"]) + alt = _validate_alt(values.get("alt")) exists = False if not is_valid_id(values["id"]): return jsonify(valid_id=False, exists=False, path=None) - path = posixpath.join(values["path"], values["id"]) + path = posixpath.join(request_path, values["id"]) ts = g.admin_context.tree.edit(path, datamodel=values.get("model"), alt=alt) with ts: @@ -247,10 +283,11 @@ def add_new_record(): @bp.route("/deleterecord", methods=["POST"]) def delete_record(): - alt = request.values.get("alt") or PRIMARY_ALT + path = _validate_path(request.values["path"]) + alt = _validate_alt(request.values.get("alt")) delete_master = request.values.get("delete_master") == "1" - if request.values["path"] != "/": - ts = g.admin_context.tree.edit(request.values["path"], alt=alt) + if path != "/": + ts = g.admin_context.tree.edit(path, alt=alt) with ts: ts.delete(delete_master=delete_master) return jsonify(okay=True) @@ -259,9 +296,10 @@ def delete_record(): @bp.route("/rawrecord", methods=["PUT"]) def update_raw_record(): values = request.get_json() + path = _validate_path(values["path"]) + alt = _validate_alt(values.get("alt")) data = values["data"] - alt = values.get("alt") or PRIMARY_ALT - ts = g.admin_context.tree.edit(values["path"], alt=alt) + ts = g.admin_context.tree.edit(path, alt=alt) with ts: ts.data.update(data) return jsonify(path=ts.path) diff --git a/tests/test_api.py b/tests/test_api.py index dae8a29f1..9bf8f199a 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -1,6 +1,7 @@ import json import os from operator import itemgetter +from pathlib import Path import pytest @@ -86,6 +87,23 @@ def test_recordinfo_children_sort_limited_alts(project, env): assert list(sorted(child_data, key=itemgetter("label"))) == child_data +def test_newrecord(scratch_project, scratch_env): + params = {"path": "/", "id": "new", "data": {}} + webadmin = WebAdmin(scratch_env, output_path=scratch_project.tree) + resp = webadmin.test_client().post("/admin/api/newrecord", json=params) + assert resp.status_code == 200 + assert resp.get_json() == {"valid_id": True, "exists": False, "path": "/new"} + assert Path(scratch_project.tree, "content", "new", "contents.lr").exists() + + +def test_newrecord_bad_path(scratch_project, scratch_env): + params = {"path": "/../../templates", "id": "", "data": {}} + webadmin = WebAdmin(scratch_env, output_path=scratch_project.tree) + resp = webadmin.test_client().post("/admin/api/newrecord", json=params) + assert resp.status_code == 400 + assert resp.get_data(as_text=True) == "Invalid path" + + def test_eventstream_yield_bytes(): count = 0