diff --git a/rdiffweb/controller/cherrypy_wtf.py b/rdiffweb/controller/cherrypy_wtf.py
index 1b83ace5..81dd243f 100644
--- a/rdiffweb/controller/cherrypy_wtf.py
+++ b/rdiffweb/controller/cherrypy_wtf.py
@@ -1,5 +1,26 @@
-from wtforms.form import Form
+# -*- coding: utf-8 -*-
+# rdiffweb, A web interface to rdiff-backup repositories
+# Copyright (C) 2012-2021 rdiffweb contributors
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+import os
+from datetime import timedelta
+
import cherrypy
+from wtforms.csrf.session import SessionCSRF
+from wtforms.form import Form
SUBMIT_METHODS = {'POST', 'PUT', 'PATCH', 'DELETE'}
@@ -26,14 +47,26 @@ def getlist(self, key):
_AUTO = _ProxyFormdata()
+# Provide a default secret
+_CSRF_SECRET = os.environ.get('RDIFFWEB_CSRF_SECRET', 'b5WPUOw3Zd7gr1ligLilAAUA9zuWkW41')
+
class CherryForm(Form):
"""
Custom implementation of WTForm for cherrypy to support kwargs parms.
-
+
If ``formdata`` is not specified, this will use cherrypy.request.params
Explicitly pass ``formdata=None`` to prevent this.
"""
+ class Meta:
+ csrf = True
+ csrf_class = SessionCSRF
+ csrf_secret = _CSRF_SECRET.encode('utf-8')
+ csrf_time_limit = timedelta(minutes=30)
+
+ @property
+ def csrf_context(self):
+ return cherrypy.session
def __init__(self, formdata=_AUTO, **kwargs):
super().__init__(formdata=formdata, **kwargs)
@@ -51,3 +84,8 @@ def validate_on_submit(self):
This is a shortcut for ``form.is_submitted() and form.validate()``.
"""
return self.is_submitted() and self.validate()
+
+ @property
+ def error_message(self):
+ if self.errors:
+ return ' '.join(['%s: %s' % (field, ', '.join(messages)) for field, messages in self.errors.items()])
diff --git a/rdiffweb/controller/page_admin.py b/rdiffweb/controller/page_admin.py
index 058e77d0..d914ef61 100644
--- a/rdiffweb/controller/page_admin.py
+++ b/rdiffweb/controller/page_admin.py
@@ -22,22 +22,21 @@
import pwd
import subprocess
import sys
+from collections import OrderedDict
import cherrypy
import humanfriendly
import psutil
-from wtforms import validators, widgets
-from wtforms.fields.core import StringField, SelectField, Field
-from wtforms.fields.html5 import EmailField
-from wtforms.fields.simple import PasswordField
-
from rdiffweb.controller import Controller, flash
from rdiffweb.controller.cherrypy_wtf import CherryForm
from rdiffweb.core.config import Option
from rdiffweb.core.i18n import ugettext as _
from rdiffweb.core.quota import QuotaUnsupported
from rdiffweb.core.store import ADMIN_ROLE, MAINTAINER_ROLE, USER_ROLE
-from collections import OrderedDict
+from wtforms import validators, widgets
+from wtforms.fields.core import Field, SelectField, StringField
+from wtforms.fields.html5 import EmailField
+from wtforms.fields.simple import PasswordField
# Define the logger
logger = logging.getLogger(__name__)
@@ -49,12 +48,18 @@ def get_pyinfo():
yield _('OS Version'), '%s %s (%s %s)' % (platform.system(), platform.release(), distro.linux_distribution()[0].capitalize(), distro.linux_distribution()[1])
except:
yield _('OS Version'), '%s %s' % (platform.system(), platform.release())
- if hasattr(os, 'path'): yield _('OS Path'), os.environ['PATH']
- if hasattr(sys, 'version'): yield _('Python Version'), ''.join(sys.version)
- if hasattr(sys, 'subversion'): yield _('Python Subversion'), ', '.join(sys.subversion)
- if hasattr(sys, 'prefix'): yield _('Python Prefix'), sys.prefix
- if hasattr(sys, 'executable'): yield _('Python Executable'), sys.executable
- if hasattr(sys, 'path'): yield _('Python Path'), ', '.join(sys.path)
+ if hasattr(os, 'path'):
+ yield _('OS Path'), os.environ['PATH']
+ if hasattr(sys, 'version'):
+ yield _('Python Version'), ''.join(sys.version)
+ if hasattr(sys, 'subversion'):
+ yield _('Python Subversion'), ', '.join(sys.subversion)
+ if hasattr(sys, 'prefix'):
+ yield _('Python Prefix'), sys.prefix
+ if hasattr(sys, 'executable'):
+ yield _('Python Executable'), sys.executable
+ if hasattr(sys, 'path'):
+ yield _('Python Path'), ', '.join(sys.path)
def get_osinfo():
@@ -71,7 +76,8 @@ def pw_name(uid):
except:
return
- if hasattr(sys, 'getfilesystemencoding'): yield _('File System Encoding'), sys.getfilesystemencoding()
+ if hasattr(sys, 'getfilesystemencoding'):
+ yield _('File System Encoding'), sys.getfilesystemencoding()
if hasattr(os, 'getcwd'):
yield _('Current Working Directory'), os.getcwd()
if hasattr(os, 'getegid'):
@@ -167,10 +173,9 @@ class UserForm(CherryForm):
validators=[validators.optional()],
description=_("Disk spaces (in bytes) used by this user."))
- @property
- def error_message(self):
- if self.errors:
- return ' '.join(['%s: %s' % (field, ', '.join(messages)) for field, messages in self.errors.items()])
+
+class DeleteUserForm(CherryForm):
+ username = StringField(_('Username'), validators=[validators.required()])
@cherrypy.tools.is_admin()
@@ -180,6 +185,67 @@ class AdminPage(Controller):
logfile = Option('log_file')
logaccessfile = Option('log_access_file')
+ def _add_or_edit_user(self, action, form):
+ assert action in ['add', 'edit']
+ assert form
+ # Validate form.
+ if not form.validate():
+ flash(form.error_message, level='error')
+ return
+ if action == 'add':
+ user = self.app.store.add_user(form.username.data, form.password.data)
+ else:
+ user = self.app.store.get_user(form.username.data)
+ if form.password.data:
+ user.set_password(form.password.data, old_password=None)
+ # Don't allow the user to changes it's "role" state.
+ if form.username.data != self.app.currentuser.username:
+ user.role = form.role.data
+ user.email = form.email.data or ''
+ if form.user_root.data:
+ user.user_root = form.user_root.data
+ if not user.valid_user_root():
+ flash(_("User's root directory %s is not accessible!") % user.user_root, level='error')
+ logger.warning("user's root directory %s is not accessible" % user.user_root)
+ # Try to update disk quota if the human readable value changed.
+ # Report error using flash.
+ if form.disk_quota and form.disk_quota.data:
+ try:
+ new_quota = form.disk_quota.data
+ old_quota = humanfriendly.parse_size(humanfriendly.format_size(user.disk_quota, binary=True))
+ if old_quota != new_quota:
+ user.disk_quota = new_quota
+ flash(_("User's quota updated"), level='success')
+ except QuotaUnsupported as e:
+ flash(_("Setting user's quota is not supported"), level='warning')
+ except Exception as e:
+ flash(_("Failed to update user's quota: %s") % e, level='error')
+ logger.warning("failed to update user's quota", exc_info=1)
+ if action == 'add':
+ flash(_("User added successfully."))
+ else:
+ flash(_("User information modified successfully."))
+
+ def _delete_user(self, action, form):
+ assert action == 'delete'
+ assert form
+ # Validate form.
+ if not form.validate():
+ flash(form.error_message, level='error')
+ return
+ if form.username.data == self.app.currentuser.username:
+ flash(_("You cannot remove your own account!"), level='error')
+ else:
+ try:
+ user = self.app.store.get_user(form.username.data)
+ if user:
+ user.delete()
+ flash(_("User account removed."))
+ else:
+ flash(_("User doesn't exists!"), level='warning')
+ except ValueError as e:
+ flash(e, level='error')
+
def _get_log_files(self):
"""
Return a list of log files to be shown in admin area.
@@ -226,57 +292,10 @@ def users(self, criteria=u"", search=u"", action=u"", **kwargs):
# If we're just showing the initial page, just do that
form = UserForm()
- if action in ["add", "edit"] and not form.validate():
- # Display the form error to user using flash
- flash(form.error_message, level='error')
-
- elif action in ["add", "edit"] and form.validate():
- if action == 'add':
- user = self.app.store.add_user(form.username.data, form.password.data)
- else:
- user = self.app.store.get_user(form.username.data)
- if form.password.data:
- user.set_password(form.password.data, old_password=None)
- # Don't allow the user to changes it's "role" state.
- if form.username.data != self.app.currentuser.username:
- user.role = form.role.data
- user.email = form.email.data or ''
- if form.user_root.data:
- user.user_root = form.user_root.data
- if not user.valid_user_root():
- flash(_("User's root directory %s is not accessible!") % user.user_root, level='error')
- logger.warning("user's root directory %s is not accessible" % user.user_root)
- # Try to update disk quota if the human readable value changed.
- # Report error using flash.
- if form.disk_quota and form.disk_quota.data:
- try:
- new_quota = form.disk_quota.data
- old_quota = humanfriendly.parse_size(humanfriendly.format_size(user.disk_quota, binary=True))
- if old_quota != new_quota:
- user.disk_quota = new_quota
- flash(_("User's quota updated"), level='success')
- except QuotaUnsupported as e:
- flash(_("Setting user's quota is not supported"), level='warning')
- except Exception as e:
- flash(_("Failed to update user's quota: %s") % e, level='error')
- logger.warning("failed to update user's quota", exc_info=1)
- if action == 'add':
- flash(_("User added successfully."))
- else:
- flash(_("User information modified successfully."))
+ if action in ["add", "edit"]:
+ self._add_or_edit_user(action, form)
elif action == 'delete':
- if form.username.data == self.app.currentuser.username:
- flash(_("You cannot remove your own account!"), level='error')
- else:
- try:
- user = self.app.store.get_user(form.username.data)
- if user:
- user.delete()
- flash(_("User account removed."))
- else:
- flash(_("User doesn't exists!"), level='warning')
- except ValueError as e:
- flash(e, level='error')
+ self._delete_user(action, DeleteUserForm())
params = {
"form": UserForm(formdata=None),
diff --git a/rdiffweb/controller/page_delete.py b/rdiffweb/controller/page_delete.py
index 5e0663ba..e9af5127 100644
--- a/rdiffweb/controller/page_delete.py
+++ b/rdiffweb/controller/page_delete.py
@@ -24,31 +24,42 @@
import logging
import cherrypy
-from rdiffweb.controller import Controller, validate
+from rdiffweb.controller import Controller, flash
+from rdiffweb.controller.cherrypy_wtf import CherryForm
from rdiffweb.controller.dispatch import poppath
from rdiffweb.controller.filter_authorization import is_maintainer
-
+from rdiffweb.core.i18n import ugettext as _
+from wtforms import validators
+from wtforms.fields.core import StringField
_logger = logging.getLogger(__name__)
+class DeleteRepoForm(CherryForm):
+ confirm = StringField(_('Confirmation'), validators=[validators.required()])
+ redirect = StringField(default='/')
+
+
@poppath()
class DeletePage(Controller):
@cherrypy.expose
- def default(self, path=b"", confirm=None, redirect='/', **kwargs):
+ def default(self, path=b"", **kwargs):
# Check permissions on path/repo
unused, path_obj = self.app.store.get_repo_path(path)
# Check user's permissions
is_maintainer()
+ # validate form
+ form = DeleteRepoForm()
+ if not form.validate():
+ raise cherrypy.HTTPError(400, form.error_message)
+
# Validate the name
- validate(confirm)
- if confirm != path_obj.display_name:
- _logger.info("do not delete repo, bad confirmation %r != %r", confirm, path_obj.display_name)
- raise cherrypy.HTTPError(400)
+ if form.confirm.data != path_obj.display_name:
+ _logger.info("do not delete repo, bad confirmation %r != %r", form.confirm.data, path_obj.display_name)
+ raise cherrypy.HTTPError(400, 'bad confirmation')
# Delete repository
self.app.scheduler.add_task(path_obj.delete, args=[])
-
- raise cherrypy.HTTPRedirect(redirect)
+ raise cherrypy.HTTPRedirect(form.redirect.data)
diff --git a/rdiffweb/controller/pref_sshkeys.py b/rdiffweb/controller/pref_sshkeys.py
index faaff885..4900a4f4 100644
--- a/rdiffweb/controller/pref_sshkeys.py
+++ b/rdiffweb/controller/pref_sshkeys.py
@@ -47,7 +47,7 @@ def validate_key(unused_form, field):
raise ValidationError(_("Invalid SSH key."))
-class SSHForm(CherryForm):
+class SshForm(CherryForm):
title = StringField(
_('Title'),
description=_('The title is an optional description to identify the key. e.g.: bob@thinkpad-t530'),
@@ -60,6 +60,10 @@ class SSHForm(CherryForm):
fingerprint = StringField('Fingerprint')
+class DeleteSshForm(CherryForm):
+ fingerprint = StringField('Fingerprint')
+
+
class SSHKeysPlugin(Controller):
"""
Plugin to configure SSH keys.
@@ -69,30 +73,45 @@ class SSHKeysPlugin(Controller):
panel_name = _('SSH Keys')
+ def _add_key(self, action, form):
+ assert action == 'add'
+ assert form
+ if not form.validate():
+ for unused, messages in form.errors.items():
+ for message in messages:
+ flash(message, level='warning')
+ return
+ try:
+ self.app.currentuser.add_authorizedkey(key=form.key.data, comment=form.title.data)
+ except DuplicateSSHKeyError as e:
+ flash(str(e), level='error')
+ except:
+ flash(_("Unknown error while adding the SSH Key"), level='error')
+ _logger.warning("error adding ssh key", exc_info=1)
+
+ def _delete_key(self, action, form):
+ assert action == 'delete'
+ assert form
+ if not form.validate():
+ for unused, messages in form.errors.items():
+ for message in messages:
+ flash(message, level='warning')
+ return
+ is_maintainer()
+ try:
+ self.app.currentuser.delete_authorizedkey(form.fingerprint.data)
+ except:
+ flash(_("Unknown error while removing the SSH Key"), level='error')
+ _logger.warning("error removing ssh key", exc_info=1)
+
def render_prefs_panel(self, panelid, action=None, **kwargs): # @UnusedVariable
# Handle action
- form = SSHForm()
- if action == "add" and not form.validate():
- for unused_field, messages in form.errors.items():
- for message in messages:
- flash(message, level='warning')
- elif action == 'add':
- # Add the key to the current user.
- try:
- self.app.currentuser.add_authorizedkey(key=form.key.data, comment=form.title.data)
- except DuplicateSSHKeyError as e:
- flash(str(e), level='error')
- except:
- flash(_("Unknown error while adding the SSH Key"), level='error')
- _logger.warning("error adding ssh key", exc_info=1)
+ form = SshForm()
+ if action == "add":
+ self._add_key(action, form)
elif action == 'delete':
- is_maintainer()
- try:
- self.app.currentuser.delete_authorizedkey(form.fingerprint.data)
- except:
- flash(_("Unknown error while removing the SSH Key"), level='error')
- _logger.warning("error removing ssh key", exc_info=1)
+ self._delete_key(action, DeleteSshForm())
# Get SSH keys if file exists.
params = {
diff --git a/rdiffweb/controller/tests/test_page_admin.py b/rdiffweb/controller/tests/test_page_admin.py
index f4eef7d4..8d3e07e6 100644
--- a/rdiffweb/controller/tests/test_page_admin.py
+++ b/rdiffweb/controller/tests/test_page_admin.py
@@ -20,9 +20,7 @@
@author: Patrik Dufresne
"""
-import logging
import os
-import unittest
from unittest.mock import ANY, MagicMock
import rdiffweb.test
@@ -33,7 +31,7 @@
class AbstractAdminTest(rdiffweb.test.WebCase):
"""Class to regroup command method to test admin page."""
- def _add_user(self, username=None, email=None, password=None, user_root=None, role=None):
+ def _add_user(self, username=None, email=None, password=None, user_root=None, role=None, csrf_token=None):
b = {}
b['action'] = 'add'
if username is not None:
@@ -46,9 +44,10 @@ def _add_user(self, username=None, email=None, password=None, user_root=None, ro
b['user_root'] = user_root
if role is not None:
b['role'] = str(role)
+ b['csrf_token'] = csrf_token or self.getCsrfToken("/admin/users/")
self.getPage("/admin/users/", method='POST', body=b)
- def _edit_user(self, username=None, email=None, password=None, user_root=None, role=None, disk_quota=None):
+ def _edit_user(self, username=None, email=None, password=None, user_root=None, role=None, disk_quota=None, csrf_token=None):
b = {}
b['action'] = 'edit'
if username is not None:
@@ -63,11 +62,13 @@ def _edit_user(self, username=None, email=None, password=None, user_root=None, r
b['role'] = str(role)
if disk_quota is not None:
b['disk_quota'] = disk_quota
+ b['csrf_token'] = csrf_token or self.getCsrfToken("/admin/users/")
self.getPage("/admin/users/", method='POST', body=b)
- def _delete_user(self, username='test1'):
+ def _delete_user(self, username='test1', csrf_token=None):
b = {'action': 'delete',
'username': username}
+ b['csrf_token'] = csrf_token or self.getCsrfToken("/admin/users/")
self.getPage("/admin/users/", method='POST', body=b)
@@ -76,15 +77,50 @@ class AdminUsersAsAdminTest(AbstractAdminTest):
login = True
+ def test_add_user_without_csrf(self):
+ # Given I'm logging as Admin
+ # When trying add a user with invalid csrf token
+ self._add_user("admin_role", "admin_role@test.com", "test2", "/home/", ADMIN_ROLE, csrf_token='1234567890##invalid')
+ # Then the form is refused
+ self.assertStatus(200)
+ self.assertInBody('CSRF failed')
+ # Then user is not created
+ self.assertIsNone(self.app.store.get_user('admin_role'))
+
+ def test_edit_user_without_csrf(self):
+ # Given an existing user
+ self._add_user("myuser", "myuser@test.com", "test2", "/home/", MAINTAINER_ROLE)
+ # When trying edit a user with invalid csrf token
+ self._edit_user(username='myuser', email="newemail@test.com", csrf_token='1234567890##invalid')
+ # Then the form is refused
+ self.assertStatus(200)
+ self.assertInBody('CSRF failed')
+ # Then user is not updated
+ self.assertEqual("myuser@test.com", self.app.store.get_user('myuser').email)
+
+ def test_delete_user_without_csrf(self):
+ # Given an existing user
+ self._add_user("myuser", "myuser@test.com", "test2", "/home/", MAINTAINER_ROLE)
+ # When trying edit a user with invalid csrf token
+ self._delete_user(username='myuser', csrf_token='1234567890##invalid')
+ # Then the form is refused
+ self.assertStatus(200)
+ self.assertInBody('CSRF failed')
+ # Then user is not delete
+ self.assertIsNotNone(self.app.store.get_user('myuser').email)
+
def test_add_user_with_role(self):
# Add user to be listed
self._add_user("admin_role", "admin_role@test.com", "test2", "/home/", ADMIN_ROLE)
+ self.assertStatus(200)
self.assertEqual(ADMIN_ROLE, self.app.store.get_user('admin_role').role)
self._add_user("maintainer_role", "maintainer_role@test.com", "test2", "/home/", MAINTAINER_ROLE)
+ self.assertStatus(200)
self.assertEqual(MAINTAINER_ROLE, self.app.store.get_user('maintainer_role').role)
self._add_user("user_role", "user_role@test.com", "test2", "/home/", USER_ROLE)
+ self.assertStatus(200)
self.assertEqual(USER_ROLE, self.app.store.get_user('user_role').role)
def test_add_user_with_invalid_role(self):
@@ -202,7 +238,7 @@ def test_delete_user_admin(self):
Verify failure to delete our self.
"""
# Create another admin user
- self._add_user('admin2', '', 'password', '' , ADMIN_ROLE)
+ self._add_user('admin2', '', 'password', '', ADMIN_ROLE)
self.getPage("/logout/")
self._login('admin2', 'password')
@@ -348,21 +384,21 @@ def test_add_user(self):
"""
Check if adding user is forbidden.
"""
- self._add_user("test2", "test2@test.com", "test2", "/tmp/", USER_ROLE)
+ self._add_user("test2", "test2@test.com", "test2", "/tmp/", USER_ROLE, csrf_token='1234567890##invalid')
self.assertStatus(403)
def test_delete_user(self):
"""
Check if deleting user is forbidden.
"""
- self._delete_user("test")
+ self._delete_user("test", csrf_token='1234567890##invalid')
self.assertStatus(403)
def test_edit_user(self):
"""
Check if editing user is forbidden.
"""
- self._edit_user("test", "test1@test.com", "test", "/var/invalid/", USER_ROLE)
+ self._edit_user("test", "test1@test.com", "test", "/var/invalid/", USER_ROLE, csrf_token='1234567890##invalid')
self.assertStatus(403)
def test_users(self):
diff --git a/rdiffweb/controller/tests/test_page_delete.py b/rdiffweb/controller/tests/test_page_delete.py
index 1b9c3c00..5ca9b644 100644
--- a/rdiffweb/controller/tests/test_page_delete.py
+++ b/rdiffweb/controller/tests/test_page_delete.py
@@ -37,12 +37,13 @@ class DeleteRepoTest(rdiffweb.test.WebCase):
login = True
- def _delete(self, user, repo, confirm, **kwargs):
+ def _delete(self, user, repo, confirm, redirect=None, csrf_token=None):
body = {}
if confirm is not None:
body.update({'confirm': confirm})
- if kwargs:
- body.update(kwargs)
+ if redirect is not None:
+ body['redirect'] = redirect
+ body['csrf_token'] = csrf_token or self.getCsrfToken('/prefs/sshkeys')
self.getPage("/delete/" + user + "/" + repo + "/", method="POST", body=body)
@skipIf(RDIFF_BACKUP_VERSION < (2, 0, 1), "rdiff-backup-delete is available since 2.0.1")
@@ -61,6 +62,18 @@ def test_delete_path(self):
with self.assertRaises(DoesNotExistError):
self.app.store.get_repo_path('admin/testcases/Revisions', as_user=admin)
+ def test_delete_with_csrf_token(self):
+ # Given an authenticated user with an existing repositor
+ admin = self.app.store.get_user('admin')
+ self.app.store.get_repo_path('admin/testcases/Revisions', as_user=admin)
+ # When trying to delete a repository without a valide csrf token
+ self._delete(self.USERNAME, 'testcases/Revisions', 'Revisions', csrf_token='1234567890##invalid')
+ # Then the form is refused
+ self.assertStatus(400)
+ self.assertInBody('CSRF failed')
+ # Then repository is not deleted.
+ self.assertIsNotNone(self.app.store.get_repo_path('admin/testcases/Revisions', as_user=admin))
+
@skipIf(RDIFF_BACKUP_VERSION < (2, 0, 1), "rdiff-backup-delete is available since 2.0.1")
def test_delete_path_with_wrong_confirm(self):
# Check directory exists
@@ -118,8 +131,9 @@ def test_delete_repo_without_confirm(self):
self.assertEqual(['broker-repo', 'testcases'], [r.name for r in self.app.store.get_user('admin').repo_objs])
# Delete repo without confirmation.
self._delete(self.USERNAME, self.REPO, None)
- # TODO Make sure the repository is not delete
+ # Make sure the repository is not delete
self.assertStatus(400)
+ self.assertInBody('confirm: This field is required')
self.assertEqual(['broker-repo', 'testcases'], [r.name for r in self.app.store.get_user('admin').repo_objs])
def test_delete_repo_as_admin(self):
diff --git a/rdiffweb/controller/tests/test_page_prefs_ssh.py b/rdiffweb/controller/tests/test_page_prefs_ssh.py
index 325adea2..6206823f 100644
--- a/rdiffweb/controller/tests/test_page_prefs_ssh.py
+++ b/rdiffweb/controller/tests/test_page_prefs_ssh.py
@@ -21,31 +21,30 @@
"""
-import logging
-import unittest
-
import rdiffweb.test
+PREFS_SSHKEYS = "/prefs/sshkeys/"
-class SSHKeysTest(rdiffweb.test.WebCase):
- PREFS_SSHKEYS = "/prefs/sshkeys/"
+class SSHKeysTest(rdiffweb.test.WebCase):
login = True
- def _delete_ssh_key(self, fingerprint):
+ def _delete_ssh_key(self, fingerprint, csrf_token=None):
b = {'action': 'delete',
'fingerprint': fingerprint}
- self.getPage(self.PREFS_SSHKEYS, method='POST', body=b)
+ b['csrf_token'] = csrf_token or self.getCsrfToken(PREFS_SSHKEYS)
+ self.getPage(PREFS_SSHKEYS, method='POST', body=b)
- def _add_ssh_key(self, title, key):
+ def _add_ssh_key(self, title, key, csrf_token=None):
b = {'action': 'add',
'title': title,
'key': key}
- self.getPage(self.PREFS_SSHKEYS, method='POST', body=b)
+ b['csrf_token'] = csrf_token or self.getCsrfToken(PREFS_SSHKEYS)
+ self.getPage(PREFS_SSHKEYS, method='POST', body=b)
def test_page(self):
- self.getPage(self.PREFS_SSHKEYS)
+ self.getPage(PREFS_SSHKEYS)
self.assertStatus('200 OK')
def test_add(self):
@@ -61,10 +60,25 @@ def test_add(self):
self.assertEqual(1, len(list(user.authorizedkeys)))
# Show page
- self.getPage(self.PREFS_SSHKEYS)
+ self.getPage(PREFS_SSHKEYS)
self.assertInBody("test@mysshkey")
self.assertInBody("4d:42:8b:35:e5:55:71:f7:b3:0d:58:f9:b1:2c:9e:91")
+ def test_add_without_csrf_token(self):
+ # Given an authenticated user.
+ # When trying to add a key without valid csrf token.
+ self._add_ssh_key(
+ "test@mysshkey",
+ "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDSEN5VTn9MLituZvdYTZMbZEaMxe0UuU7BelxHkvxzSpVWtazrIBEc3KZjtVoK9F3+0kd26P4DzSQuPUl3yZDgyZZeXrF6p2GlEA7A3tPuOEsAQ9c0oTiDYktq5/Go8vD+XAZKLd//qmCWW1Jg4datkWchMKJzbHUgBrBH015FDbGvGDWYTfVyb8I9H+LQ0GmbTHsuTu63DhPODncMtWPuS9be/flb4EEojMIx5Vce0SNO9Eih38W7jTvNWxZb75k5yfPJxBULRnS5v/fPnDVVtD3JSGybSwKoMdsMX5iImAeNhqnvd8gBu1f0IycUQexTbJXk1rPiRcF13SjKrfXz ikus060@ikus060-t530",
+ csrf_token='1234567890##invalid',
+ )
+ # Then form is refused.
+ self.assertStatus(200)
+ self.assertInBody('CSRF failed')
+ # Then ssh key is not added.
+ self.getPage(PREFS_SSHKEYS)
+ self.assertNotInBody("4d:42:8b:35:e5:55:71:f7:b3:0d:58:f9:b1:2c:9e:91")
+
def test_add_duplicate(self):
# Delete existing keys
user = self.app.store.get_user('admin')
@@ -112,6 +126,21 @@ def test_delete(self):
self.assertStatus('200 OK')
self.assertEqual(0, len(list(user.authorizedkeys)))
+ def test_delete_without_csrf_token(self):
+ # Given an authenticated user with an existing SSH Key
+ user = self.app.store.get_user('admin')
+ self._add_ssh_key("test@mysshkey", "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDSEN5VTn9MLituZvdYTZMbZEaMxe0UuU7BelxHkvxzSpVWtazrIBEc3KZjtVoK9F3+0kd26P4DzSQuPUl3yZDgyZZeXrF6p2GlEA7A3tPuOEsAQ9c0oTiDYktq5/Go8vD+XAZKLd//qmCWW1Jg4datkWchMKJzbHUgBrBH015FDbGvGDWYTfVyb8I9H+LQ0GmbTHsuTu63DhPODncMtWPuS9be/flb4EEojMIx5Vce0SNO9Eih38W7jTvNWxZb75k5yfPJxBULRnS5v/fPnDVVtD3JSGybSwKoMdsMX5iImAeNhqnvd8gBu1f0IycUQexTbJXk1rPiRcF13SjKrfXz ikus060@ikus060-t530")
+ self.assertStatus('200 OK')
+ self.assertEqual(1, len(list(user.authorizedkeys)))
+ # When trying to delete a key without valid csrf token.
+ self._delete_ssh_key("4d:42:8b:35:e5:55:71:f7:b3:0d:58:f9:b1:2c:9e:91", csrf_token='1234567890##invalid')
+ # Then form is refused.
+ self.assertStatus(200)
+ self.assertInBody('CSRF failed')
+ # Then ssh key is not delete.
+ self.getPage(PREFS_SSHKEYS)
+ self.assertInBody("4d:42:8b:35:e5:55:71:f7:b3:0d:58:f9:b1:2c:9e:91")
+
def test_delete_invalid(self):
# Delete existing keys
user = self.app.store.get_user('admin')
diff --git a/rdiffweb/templates/admin_users.html b/rdiffweb/templates/admin_users.html
index 20419ed8..52112aec 100644
--- a/rdiffweb/templates/admin_users.html
+++ b/rdiffweb/templates/admin_users.html
@@ -55,7 +55,7 @@
data-toggle="modal"
data-target="#edit-user-{{ user.username }}-modal">
{% trans %}Edit{% endtrans %}
- {{ button_confirm(label=_('Delete'), target="#delete-user-modal", action="delete", username=user.username) }}
+ {{ button_confirm(label=_('Delete'), target="#delete-user-modal", action="delete", username=user.username, csrf_token=form.csrf_token.current_token) }}
{% endfor %}
@@ -69,6 +69,7 @@
{% macro user_form(form, id=None, user=None) %}
+{{ form.csrf_token }}
{% trans %}To create an LDAP user, use the same username. You may leave the email and password empty.{% endtrans %}
@@ -151,7 +152,7 @@
id='delete-user-modal',
title=_('Delete User'),
message=_("Are you sure you want to delete this User?"),
- fields=['action', 'username'],
+ fields=['action', 'username', 'csrf_token'],
submit=_('Delete')) }}
{% endblock %}
diff --git a/rdiffweb/templates/prefs_sshkeys.html b/rdiffweb/templates/prefs_sshkeys.html
index 4fa44d3e..81482ee7 100644
--- a/rdiffweb/templates/prefs_sshkeys.html
+++ b/rdiffweb/templates/prefs_sshkeys.html
@@ -17,7 +17,7 @@
along with this program. If not, see .
#}
-
+
{% include 'message.html' %}
@@ -58,7 +58,7 @@
- {{ button_confirm(label=_('Delete'), target="#delete-sshkey-modal", action="delete", fingerprint=key.fingerprint, disabled=not is_maintainer) }}
+ {{ button_confirm(label=_('Delete'), target="#delete-sshkey-modal", action="delete", fingerprint=key.fingerprint, csrf_token=form.csrf_token.current_token, disabled=not is_maintainer) }}
{{ key.title }}
@@ -74,6 +74,7 @@
{# Dialog to create SSH key. #}
{% call modal_dialog('add-sshkey-modal',_('Add SSH key'), _('Add SSH key')) %}
+ {{ form.csrf_token }}