Skip to content

Commit

Permalink
Better API parameter validation (3.3 branch) (#1182)
Browse files Browse the repository at this point in the history
* test: new record creation confined to `content` tree

* fix: better validation of API parameters
  • Loading branch information
dairiki committed Feb 27, 2024
1 parent aef3c90 commit 45e9747
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 24 deletions.
86 changes: 62 additions & 24 deletions lektor/admin/modules/api.py
@@ -1,5 +1,6 @@
import os
import posixpath
from typing import Optional

import click
from flask import Blueprint
Expand All @@ -12,12 +13,39 @@
from lektor.constants import PRIMARY_ALT
from lektor.publisher import publish
from lektor.publisher import PublishError
from lektor.utils import cleanup_path
from lektor.utils import is_valid_id


bp = Blueprint("api", __name__, url_prefix="/admin/api")


class ValidationError(Exception):
"""Invalid input."""


@bp.errorhandler(ValidationError)
def validation_error_handler(error):
message = str(error)
return message, 400


def _validate_path(value: str) -> str:
path = cleanup_path(value)
if path != value:
raise ValidationError("Invalid path")
return path


def _validate_alt(value: Optional[str]) -> str:
if value is None:
return PRIMARY_ALT
lektor_config = g.admin_context.pad.config
if not lektor_config.is_valid_alternative(value):
raise ValidationError("Invalid alt")
return value


def get_record_and_parent(path):
pad = g.admin_context.pad
record = pad.get(path)
Expand All @@ -31,8 +59,9 @@ def get_record_and_parent(path):
@bp.route("/pathinfo")
def get_path_info():
"""Returns the path segment information for a record."""
alt = request.args.get("alt") or PRIMARY_ALT
tree_item = g.admin_context.tree.get(request.args["path"])
path = _validate_path(request.args["path"])
alt = _validate_alt(request.args.get("alt"))
tree_item = g.admin_context.tree.get(path)
segments = []

while tree_item is not None:
Expand All @@ -53,8 +82,8 @@ def get_path_info():

@bp.route("/recordinfo")
def get_record_info():
request_path = request.args["path"]
alt = request.args.get("alt") or PRIMARY_ALT
request_path = _validate_path(request.args["path"])
alt = _validate_alt(request.args.get("alt"))
tree_item = g.admin_context.tree.get(request_path)

return jsonify(
Expand Down Expand Up @@ -99,16 +128,17 @@ def get_record_info():

@bp.route("/previewinfo")
def get_preview_info():
alt = request.args.get("alt") or PRIMARY_ALT
record = g.admin_context.pad.get(request.args["path"], alt=alt)
path = _validate_path(request.args["path"])
alt = _validate_alt(request.args.get("alt"))
record = g.admin_context.pad.get(path, alt=alt)
if record is None:
return jsonify(exists=False, url=None, is_hidden=True)
return jsonify(exists=True, url=record.url_path, is_hidden=record.is_hidden)


@bp.route("/find", methods=["POST"])
def find():
alt = request.values.get("alt") or PRIMARY_ALT
alt = _validate_alt(request.args.get("alt"))
lang = request.values.get("lang") or g.admin_context.info.ui_lang
q = request.values.get("q")
builder = current_app.lektor_info.get_builder()
Expand All @@ -117,8 +147,9 @@ def find():

@bp.route("/browsefs", methods=["POST"])
def browsefs():
alt = request.values.get("alt") or PRIMARY_ALT
record = g.admin_context.pad.get(request.values["path"], alt=alt)
path = _validate_path(request.args["path"])
alt = _validate_alt(request.args.get("alt"))
record = g.admin_context.pad.get(path, alt=alt)
okay = False
if record is not None:
if record.is_attachment:
Expand All @@ -143,16 +174,18 @@ def match_url():

@bp.route("/rawrecord")
def get_raw_record():
alt = request.args.get("alt") or PRIMARY_ALT
ts = g.admin_context.tree.edit(request.args["path"], alt=alt)
path = _validate_path(request.args["path"])
alt = _validate_alt(request.args.get("alt"))
ts = g.admin_context.tree.edit(path, alt=alt)
return jsonify(ts.to_json())


@bp.route("/newrecord")
def get_new_record_info():
path = _validate_path(request.args["path"])
alt = _validate_alt(request.args.get("alt"))
pad = g.admin_context.pad
alt = request.args.get("alt") or PRIMARY_ALT
tree_item = g.admin_context.tree.get(request.args["path"])
tree_item = g.admin_context.tree.get(path)

def describe_model(model):
primary_field = None
Expand Down Expand Up @@ -186,8 +219,9 @@ def describe_model(model):

@bp.route("/newattachment")
def get_new_attachment_info():
alt = request.args.get("alt") or PRIMARY_ALT
tree_item = g.admin_context.tree.get(request.args["path"])
path = _validate_path(request.args["path"])
alt = _validate_alt(request.args.get("alt"))
tree_item = g.admin_context.tree.get(path)
label_i18n = tree_item.get_record_label_i18n(alt)
return jsonify(
{
Expand All @@ -200,8 +234,9 @@ def get_new_attachment_info():

@bp.route("/newattachment", methods=["POST"])
def upload_new_attachments():
alt = request.values.get("alt") or PRIMARY_ALT
ts = g.admin_context.tree.edit(request.values["path"], alt=alt)
path = _validate_path(request.values["path"])
alt = _validate_alt(request.values.get("alt"))
ts = g.admin_context.tree.edit(path, alt=alt)
if not ts.exists or ts.is_attachment:
return jsonify({"bad_upload": True})

Expand All @@ -227,13 +262,14 @@ def upload_new_attachments():
@bp.route("/newrecord", methods=["POST"])
def add_new_record():
values = request.get_json()
alt = values.get("alt") or PRIMARY_ALT
request_path = _validate_path(values["path"])
alt = _validate_alt(values.get("alt"))
exists = False

if not is_valid_id(values["id"]):
return jsonify(valid_id=False, exists=False, path=None)

path = posixpath.join(values["path"], values["id"])
path = posixpath.join(request_path, values["id"])

ts = g.admin_context.tree.edit(path, datamodel=values.get("model"), alt=alt)
with ts:
Expand All @@ -247,10 +283,11 @@ def add_new_record():

@bp.route("/deleterecord", methods=["POST"])
def delete_record():
alt = request.values.get("alt") or PRIMARY_ALT
path = _validate_path(request.values["path"])
alt = _validate_alt(request.values.get("alt"))
delete_master = request.values.get("delete_master") == "1"
if request.values["path"] != "/":
ts = g.admin_context.tree.edit(request.values["path"], alt=alt)
if path != "/":
ts = g.admin_context.tree.edit(path, alt=alt)
with ts:
ts.delete(delete_master=delete_master)
return jsonify(okay=True)
Expand All @@ -259,9 +296,10 @@ def delete_record():
@bp.route("/rawrecord", methods=["PUT"])
def update_raw_record():
values = request.get_json()
path = _validate_path(values["path"])
alt = _validate_alt(values.get("alt"))
data = values["data"]
alt = values.get("alt") or PRIMARY_ALT
ts = g.admin_context.tree.edit(values["path"], alt=alt)
ts = g.admin_context.tree.edit(path, alt=alt)
with ts:
ts.data.update(data)
return jsonify(path=ts.path)
Expand Down
18 changes: 18 additions & 0 deletions tests/test_api.py
@@ -1,6 +1,7 @@
import json
import os
from operator import itemgetter
from pathlib import Path

import pytest

Expand Down Expand Up @@ -86,6 +87,23 @@ def test_recordinfo_children_sort_limited_alts(project, env):
assert list(sorted(child_data, key=itemgetter("label"))) == child_data


def test_newrecord(scratch_project, scratch_env):
params = {"path": "/", "id": "new", "data": {}}
webadmin = WebAdmin(scratch_env, output_path=scratch_project.tree)
resp = webadmin.test_client().post("/admin/api/newrecord", json=params)
assert resp.status_code == 200
assert resp.get_json() == {"valid_id": True, "exists": False, "path": "/new"}
assert Path(scratch_project.tree, "content", "new", "contents.lr").exists()


def test_newrecord_bad_path(scratch_project, scratch_env):
params = {"path": "/../../templates", "id": "", "data": {}}
webadmin = WebAdmin(scratch_env, output_path=scratch_project.tree)
resp = webadmin.test_client().post("/admin/api/newrecord", json=params)
assert resp.status_code == 400
assert resp.get_data(as_text=True) == "Invalid path"


def test_eventstream_yield_bytes():
count = 0

Expand Down

0 comments on commit 45e9747

Please sign in to comment.