Skip to content

Commit

Permalink
Send email notification when enabling or disabling MFA
Browse files Browse the repository at this point in the history
  • Loading branch information
ikus060 committed Oct 23, 2022
1 parent f2a32f2 commit c27c46b
Show file tree
Hide file tree
Showing 12 changed files with 132 additions and 120 deletions.
1 change: 1 addition & 0 deletions README.md
Expand Up @@ -138,6 +138,7 @@ This next release focus on two-factor-authentication as a measure to increase se
* Enforce better rate limit on login, mfa, password change and API [CVE-2022-3439](https://nvd.nist.gov/vuln/detail/CVE-2022-3439) [CVE-2022-3456](https://nvd.nist.gov/vuln/detail/CVE-2022-3456)
* Enforce 'Origin' validation [CVE-2022-3457](https://nvd.nist.gov/vuln/detail/CVE-2022-3457)
* Define idle and absolute session timeout with agressive default to protect usage on public computer [CVE-2022-3327](https://nvd.nist.gov/vuln/detail/CVE-2022-3327)
* Send email notification when enabling or disabling MFA [CVE-2022-3363](https://nvd.nist.gov/vuln/detail/CVE-2022-3363)

Breaking changes:

Expand Down
14 changes: 7 additions & 7 deletions rdiffweb/controller/page_mfa.py
Expand Up @@ -105,10 +105,10 @@ def send_code(self):
"Multi-factor authentication is enabled for your account, but your account does not have a valid email address to send the verification code to. Check your account settings with your administrator."
)
)
else:
code = cherrypy.tools.auth_mfa.generate_code()
body = self.app.templates.compile_template(
"email_mfa.html", **{"header_name": self.app.cfg.header_name, 'user': userobj, 'code': code}
)
cherrypy.engine.publish('queue_mail', to=userobj.email, subject=_("Your verification code"), message=body)
flash(_("A new verification code has been sent to your email."))
return
code = cherrypy.tools.auth_mfa.generate_code()
body = self.app.templates.compile_template(
"email_verification_code.html", **{"header_name": self.app.cfg.header_name, 'user': userobj, 'code': code}
)
cherrypy.engine.publish('queue_mail', to=userobj.email, subject=_("Your verification code"), message=body)
flash(_("A new verification code has been sent to your email."))
2 changes: 1 addition & 1 deletion rdiffweb/controller/page_pref_mfa.py
Expand Up @@ -126,7 +126,7 @@ def send_code(self):
return
code = cherrypy.tools.auth_mfa.generate_code()
body = self.app.templates.compile_template(
"email_mfa.html", **{"header_name": self.app.cfg.header_name, 'user': userobj, 'code': code}
"email_verification_code.html", **{"header_name": self.app.cfg.header_name, 'user': userobj, 'code': code}
)
cherrypy.engine.publish('queue_mail', to=userobj.email, subject=_("Your verification code"), message=body)
flash(_("A new verification code has been sent to your email."))
70 changes: 37 additions & 33 deletions rdiffweb/controller/tests/test_page_prefs_mfa.py
Expand Up @@ -15,7 +15,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from unittest.mock import MagicMock
from unittest.mock import ANY, MagicMock

import cherrypy
from parameterized import parameterized
Expand All @@ -34,47 +34,49 @@ def setUp(self):
userobj = UserObject.get_user(self.USERNAME)
userobj.email = 'admin@example.com'
userobj.add()
# Register a listener on email
self.listener = MagicMock()
cherrypy.engine.subscribe('queue_mail', self.listener.queue_email, priority=50)

def tearDown(self):
cherrypy.engine.unsubscribe('queue_mail', self.listener.queue_email)
return super().tearDown()

def _set_mfa(self, mfa):
# Define mfa for user
userobj = UserObject.get_user(self.USERNAME)
userobj.mfa = mfa
userobj.add()
# Reset mock.
self.listener.queue_email.reset_mock()
# Leave to disable mfa
if mfa == UserObject.DISABLED_MFA:
return
# Generate a code for login if required
self.listener = MagicMock()
cherrypy.engine.subscribe('queue_mail', self.listener.queue_email, priority=50)
try:
self.getPage("/mfa/")
self.assertStatus(200)
self.assertInBody("A new verification code has been sent to your email.")
# Extract code from email between <strong> and </strong>
self.listener.queue_email.assert_called_once()
message = self.listener.queue_email.call_args[1]['message']
code = message.split('<strong>', 1)[1].split('</strong>')[0]
# Login to MFA
self.getPage("/mfa/", method='POST', body={'code': code, 'submit': '1'})
self.assertStatus(303)
finally:
cherrypy.engine.unsubscribe('queue_mail', self.listener.queue_email)
self.getPage("/mfa/")
self.assertStatus(200)
self.assertInBody("A new verification code has been sent to your email.")
# Extract code from email between <strong> and </strong>
self.listener.queue_email.assert_called_once()
message = self.listener.queue_email.call_args[1]['message']
code = message.split('<strong>', 1)[1].split('</strong>')[0]
# Login to MFA
self.getPage("/mfa/", method='POST', body={'code': code, 'submit': '1'})
self.assertStatus(303)
# Clear mock.
self.listener.queue_email.reset_mock()

def _get_code(self, action):
assert action in ['enable_mfa', 'disable_mfa', 'resend_code']
# Register an email listeer to capture email send
self.listener = MagicMock()
cherrypy.engine.subscribe('queue_mail', self.listener.queue_email, priority=50)
# Query MFA page to generate a code
try:
self.getPage("/prefs/mfa", method='POST', body={action: '1'})
self.assertStatus(200)
self.assertInBody("A new verification code has been sent to your email.")
# Extract code from email between <strong> and </strong>
self.listener.queue_email.assert_called_once()
message = self.listener.queue_email.call_args[1]['message']
return message.split('<strong>', 1)[1].split('</strong>')[0]
finally:
cherrypy.engine.unsubscribe('queue_mail', self.listener.queue_email)
self.getPage("/prefs/mfa", method='POST', body={action: '1'})
self.assertStatus(200)
self.assertInBody("A new verification code has been sent to your email.")
# Extract code from email between <strong> and </strong>
self.listener.queue_email.assert_called_once()
message = self.listener.queue_email.call_args[1]['message']
self.listener.queue_email.reset_mock()
return message.split('<strong>', 1)[1].split('</strong>')[0]

def test_get(self):
# When getting the page
Expand All @@ -84,11 +86,11 @@ def test_get(self):

@parameterized.expand(
[
('enable_mfa', UserObject.DISABLED_MFA, UserObject.ENABLED_MFA),
('disable_mfa', UserObject.ENABLED_MFA, UserObject.DISABLED_MFA),
('enable_mfa', UserObject.DISABLED_MFA, UserObject.ENABLED_MFA, "Two-Factor Authentication turned on"),
('disable_mfa', UserObject.ENABLED_MFA, UserObject.DISABLED_MFA, "Two-Factor Authentication turned off"),
]
)
def test_with_valid_code(self, action, initial_mfa, expected_mfa):
def test_with_valid_code(self, action, initial_mfa, expected_mfa, expected_subject):
# Define mfa for user
self._set_mfa(initial_mfa)
# Given a user with email requesting a code
Expand All @@ -99,8 +101,10 @@ def test_with_valid_code(self, action, initial_mfa, expected_mfa):
self.assertStatus(200)
userobj = UserObject.get_user(self.USERNAME)
self.assertEqual(userobj.mfa, expected_mfa)
# Then no email get sent
# Then no verification code get sent
self.assertNotInBody("A new verification code has been sent to your email.")
# Then an email confirmation get send
self.listener.queue_email.assert_called_once_with(to=ANY, subject=expected_subject, message=ANY)
# Then next page request is still working.
self.getPage('/')
self.assertStatus(200)
Expand Down
2 changes: 1 addition & 1 deletion rdiffweb/core/config.py
Expand Up @@ -159,7 +159,7 @@ def get_parser():
'--emailsendchangednotification',
help='True to send notification when sensitive information get change in user profile.',
action='store_true',
default=False,
default=True,
)

parser.add_argument(
Expand Down
79 changes: 28 additions & 51 deletions rdiffweb/core/model/_user.py
Expand Up @@ -24,7 +24,7 @@
from sqlalchemy import Column, Integer, SmallInteger, String, and_, event, inspect, or_
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import deferred, relationship
from sqlalchemy.orm import deferred, relationship, validates
from zxcvbn import zxcvbn

import rdiffweb.tools.db # noqa
Expand Down Expand Up @@ -74,9 +74,9 @@ class UserObject(Base):
PATTERN_USERNAME = r"[a-zA-Z0-9_.\-]+$"

userid = Column('UserID', Integer, primary_key=True)
_username = Column('Username', String, nullable=False, unique=True)
username = Column('Username', String, nullable=False, unique=True)
hash_password = Column('Password', String, nullable=False, default="")
_user_root = Column('UserRoot', String, nullable=False, default="")
user_root = Column('UserRoot', String, nullable=False, default="")
_is_admin = deferred(
Column(
'IsAdmin',
Expand All @@ -86,7 +86,7 @@ class UserObject(Base):
doc="DEPRECATED This column is replaced by 'role'",
)
)
_email = Column('UserEmail', String, nullable=False, default="")
email = Column('UserEmail', String, nullable=False, default="")
restore_format = deferred(
Column(
'RestoreFormat',
Expand All @@ -96,7 +96,7 @@ class UserObject(Base):
doc="DEPRECATED This column is not used anymore",
)
)
_role = Column('role', SmallInteger, nullable=False, server_default=str(USER_ROLE))
role = Column('role', SmallInteger, nullable=False, server_default=str(USER_ROLE), default=USER_ROLE)
fullname = Column('fullname', String, nullable=False, default="")
mfa = Column('mfa', SmallInteger, nullable=False, default=DISABLED_MFA)
repo_objs = relationship(
Expand Down Expand Up @@ -129,7 +129,7 @@ def create_admin_user(cls, default_username, default_password):
userobj.add()

@classmethod
def add_user(cls, username, password=None, **attrs):
def add_user(cls, username, password=None, role=USER_ROLE, **attrs):
"""
Used to add a new user with an optional password.
"""
Expand All @@ -143,6 +143,7 @@ def add_user(cls, username, password=None, **attrs):
userobj = UserObject(
username=username,
hash_password=hash_password(password) if password else '',
role=role,
**attrs,
).add()
# Raise event
Expand Down Expand Up @@ -383,51 +384,11 @@ def set_password(self, password):
def __eq__(self, other):
return type(self) == type(other) and inspect(self).key == inspect(other).key

@hybrid_property
def username(self):
return self._username

@username.setter
def username(self, value):
oldvalue = self._username
self._username = value
if oldvalue != value:
cherrypy.engine.publish('user_attr_changed', self, {'username': (oldvalue, value)})

@hybrid_property
def role(self):
if self._role is None:
return self.USER_ROLE
return self._role

@role.setter
def role(self, value):
oldvalue = self._role
self._role = value
if oldvalue != value:
cherrypy.engine.publish('user_attr_changed', self, {'role': (oldvalue, value)})

@hybrid_property
def email(self):
return self._email

@email.setter
def email(self, value):
oldvalue = self._email
self._email = value
if oldvalue != value:
cherrypy.engine.publish('user_attr_changed', self, {'email': (oldvalue, value)})

@hybrid_property
def user_root(self):
return self._user_root

@user_root.setter
def user_root(self, value):
oldvalue = self._user_root
self._user_root = value
if oldvalue != value:
cherrypy.engine.publish('user_attr_changed', self, {'user_root': (oldvalue, value)})
@validates('username')
def validates_username(self, key, value):
if self.username:
raise ValueError('Username cannot be modified.')
return value

def validate_access_token(self, token):
"""
Expand Down Expand Up @@ -460,3 +421,19 @@ def user_after_delete(mapper, connection, target):
Publish event when user is deleted.
"""
cherrypy.engine.publish('user_deleted', target.username)


@event.listens_for(UserObject, 'after_update')
def user_attr_changed(mapper, connection, target):
changes = {}
state = inspect(target)
for attr in state.attrs:
if attr.key in ['user_root', 'email', 'role', 'mfa']:
hist = attr.load_history()
if hist.has_changes():
changes[attr.key] = (
hist.deleted[0] if len(hist.deleted) >= 1 else None,
hist.added[0] if len(hist.added) >= 1 else None,
)
if changes:
cherrypy.engine.publish('user_attr_changed', target, changes)
9 changes: 4 additions & 5 deletions rdiffweb/core/model/tests/test_user.py
Expand Up @@ -36,11 +36,6 @@


class UserObjectTest(rdiffweb.test.WebCase):

default_config = {
'email-send-changed-notification': True,
}

def _read_ssh_key(self):
"""Readthe pub key from test packages"""
filename = pkg_resources.resource_filename('rdiffweb.core.tests', 'test_publickey_ssh_rsa.pub')
Expand Down Expand Up @@ -174,12 +169,16 @@ def test_get_set(self):
user.refresh_repos()
self.listener.user_attr_changed.assert_called_with(user, {'user_root': ('', self.testcases)})
self.listener.user_attr_changed.reset_mock()
user = UserObject.get_user('larry')
user.role = UserObject.ADMIN_ROLE
user.add()
self.listener.user_attr_changed.assert_called_with(
user, {'role': (UserObject.USER_ROLE, UserObject.ADMIN_ROLE)}
)
self.listener.user_attr_changed.reset_mock()
user = UserObject.get_user('larry')
user.email = 'larry@gmail.com'
user.add()
self.listener.user_attr_changed.assert_called_with(user, {'email': ('', 'larry@gmail.com')})
self.listener.user_attr_changed.reset_mock()

Expand Down
38 changes: 25 additions & 13 deletions rdiffweb/core/notification.py
Expand Up @@ -78,19 +78,31 @@ def user_attr_changed(self, userobj, attrs={}):
return

# Leave if the mail was not changed.
if 'email' not in attrs:
return

old_email = attrs['email'][0]
if not old_email:
logger.info("can't sent mail to user [%s] without an email", userobj.username)
return

# If the email attributes was changed, send a mail notification.
body = self.app.templates.compile_template(
"email_changed.html", **{"header_name": self.app.cfg.header_name, 'user': userobj}
)
self.bus.publish('queue_mail', to=old_email, subject=_("Email address changed"), message=body)
if 'email' in attrs:
old_email = attrs['email'][0]
if not old_email:
logger.info("can't sent mail to user [%s] without an email", userobj.username)
return
# If the email attributes was changed, send a mail notification.
subject = _("Email address changed")
body = self.app.templates.compile_template(
"email_changed.html", **{"header_name": self.app.cfg.header_name, 'user': userobj}
)
self.bus.publish('queue_mail', to=old_email, subject=str(subject), message=body)

if 'mfa' in attrs:
if not userobj.email:
logger.info("can't sent mail to user [%s] without an email", userobj.username)
return
subject = (
_("Two-Factor Authentication turned off")
if userobj.mfa == UserObject.DISABLED_MFA
else _("Two-Factor Authentication turned on")
)
body = self.app.templates.compile_template(
"email_mfa.html", **{"header_name": self.app.cfg.header_name, 'user': userobj}
)
self.bus.publish('queue_mail', to=userobj.email, subject=str(subject), message=body)

def user_password_changed(self, userobj):
if not self.send_changed:
Expand Down
3 changes: 3 additions & 0 deletions rdiffweb/core/tests/test_notification.py
Expand Up @@ -118,10 +118,13 @@ def test_email_changed(self):
# Given a user with an email address
user = UserObject.get_user(self.USERNAME)
user.email = 'original_email@test.com'
user.add()
self.listener.queue_email.reset_mock()

# When updating the user's email
user = UserObject.get_user(self.USERNAME)
user.email = 'email_changed@test.com'
user.add()

# Then a email is queue to notify the user.
self.listener.queue_email.assert_called_once_with(
Expand Down

0 comments on commit c27c46b

Please sign in to comment.