Fix security issues: image validation, email masking, quantity limits, min length
- #76: Add file type validation for product images (Media severity) - #75: Mask emails in audit logs to prevent information leakage (Media severity) - #74: Add max value validator to quantity fields (Low severity) - #73: Add min length validation to password fields (Low severity)
This commit is contained in:
@@ -1,7 +1,18 @@
|
|||||||
from django import forms
|
from django import forms
|
||||||
from django.core.exceptions import ValidationError
|
from django.core.exceptions import ValidationError
|
||||||
|
from django.core.validators import FileExtensionValidator, MinLengthValidator, MaxLengthValidator
|
||||||
from .models import Category
|
from .models import Category
|
||||||
|
|
||||||
|
ALLOWED_IMAGE_EXTENSIONS = ['jpg', 'jpeg', 'png', 'gif', 'webp']
|
||||||
|
ALLOWED_MIME_TYPES = ['image/jpeg', 'image/png', 'image/gif', 'image/webp']
|
||||||
|
|
||||||
|
def validate_image_file(value):
|
||||||
|
ext = value.name.split('.')[-1].lower()
|
||||||
|
if ext not in ALLOWED_IMAGE_EXTENSIONS:
|
||||||
|
raise ValidationError(f'Tipo de archivo no permitido. Allowed: {", ".join(ALLOWED_IMAGE_EXTENSIONS)}')
|
||||||
|
if hasattr(value, 'content_type') and value.content_type not in ALLOWED_MIME_TYPES:
|
||||||
|
raise ValidationError(f'Tipo MIME no permitido. Allowed: {", ".join(ALLOWED_MIME_TYPES)}')
|
||||||
|
|
||||||
|
|
||||||
class ProductForm(forms.Form):
|
class ProductForm(forms.Form):
|
||||||
name = forms.CharField(
|
name = forms.CharField(
|
||||||
@@ -61,6 +72,7 @@ class ProductForm(forms.Form):
|
|||||||
primary_image = forms.ImageField(
|
primary_image = forms.ImageField(
|
||||||
label="Imagen Principal",
|
label="Imagen Principal",
|
||||||
required = False,
|
required = False,
|
||||||
|
validators=[validate_image_file],
|
||||||
widget = forms.ClearableFileInput(
|
widget = forms.ClearableFileInput(
|
||||||
attrs = {
|
attrs = {
|
||||||
'class': 'form-control',
|
'class': 'form-control',
|
||||||
@@ -108,6 +120,7 @@ class ProductEditForm(forms.Form):
|
|||||||
primary_image = forms.ImageField(
|
primary_image = forms.ImageField(
|
||||||
label="Imagen Principal (opcional)",
|
label="Imagen Principal (opcional)",
|
||||||
required=False,
|
required=False,
|
||||||
|
validators=[validate_image_file],
|
||||||
widget=forms.ClearableFileInput(attrs={'class': 'form-control', 'accept': 'image/*'})
|
widget=forms.ClearableFileInput(attrs={'class': 'form-control', 'accept': 'image/*'})
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -116,6 +129,7 @@ class SecondaryImageForm(forms.Form):
|
|||||||
image = forms.ImageField(
|
image = forms.ImageField(
|
||||||
label="Seleccionar Imagen",
|
label="Seleccionar Imagen",
|
||||||
required = True,
|
required = True,
|
||||||
|
validators=[validate_image_file],
|
||||||
widget = forms.ClearableFileInput(
|
widget = forms.ClearableFileInput(
|
||||||
attrs = {
|
attrs = {
|
||||||
'class': 'form-control',
|
'class': 'form-control',
|
||||||
@@ -190,7 +204,9 @@ class UserRegisterForm(forms.Form):
|
|||||||
password = forms.CharField(
|
password = forms.CharField(
|
||||||
label = "Contraseña",
|
label = "Contraseña",
|
||||||
max_length = 255,
|
max_length = 255,
|
||||||
|
min_length = 8,
|
||||||
required = True,
|
required = True,
|
||||||
|
validators=[MinLengthValidator(8)],
|
||||||
widget = forms.PasswordInput(
|
widget = forms.PasswordInput(
|
||||||
attrs = {
|
attrs = {
|
||||||
'class': 'form-control'
|
'class': 'form-control'
|
||||||
@@ -200,7 +216,9 @@ class UserRegisterForm(forms.Form):
|
|||||||
password_confirm = forms.CharField(
|
password_confirm = forms.CharField(
|
||||||
label = "Verificar Contraseña",
|
label = "Verificar Contraseña",
|
||||||
max_length = 255,
|
max_length = 255,
|
||||||
|
min_length = 8,
|
||||||
required = True,
|
required = True,
|
||||||
|
validators=[MinLengthValidator(8)],
|
||||||
widget = forms.PasswordInput(
|
widget = forms.PasswordInput(
|
||||||
attrs = {
|
attrs = {
|
||||||
'class': 'form-control'
|
'class': 'form-control'
|
||||||
|
|||||||
+11
-3
@@ -3,10 +3,13 @@ from __future__ import annotations
|
|||||||
import unicodedata
|
import unicodedata
|
||||||
from django.db import models
|
from django.db import models
|
||||||
from django.contrib.auth.models import User, AbstractUser
|
from django.contrib.auth.models import User, AbstractUser
|
||||||
|
from django.core.validators import MaxValueValidator
|
||||||
from django.utils.crypto import get_random_string
|
from django.utils.crypto import get_random_string
|
||||||
from .vars import VAT_RATE, TRANSACTION_CODE_PREFIX, TRANSACTION_CODE_LENGTH, TRANSACTION_CODE_ALPHABET
|
from .vars import VAT_RATE, TRANSACTION_CODE_PREFIX, TRANSACTION_CODE_LENGTH, TRANSACTION_CODE_ALPHABET
|
||||||
import random, string
|
import random, string
|
||||||
|
|
||||||
|
MAX_QUANTITY = 9999
|
||||||
|
|
||||||
|
|
||||||
def generate_transaction_code() -> str:
|
def generate_transaction_code() -> str:
|
||||||
while True:
|
while True:
|
||||||
@@ -154,11 +157,16 @@ class StockReservation(models.Model):
|
|||||||
class StockReservationItem(models.Model):
|
class StockReservationItem(models.Model):
|
||||||
reservation = models.ForeignKey(StockReservation, on_delete=models.CASCADE, related_name="items")
|
reservation = models.ForeignKey(StockReservation, on_delete=models.CASCADE, related_name="items")
|
||||||
product = models.ForeignKey(Product, on_delete=models.CASCADE, related_name="stock_reservation_items")
|
product = models.ForeignKey(Product, on_delete=models.CASCADE, related_name="stock_reservation_items")
|
||||||
quantity = models.PositiveIntegerField(default=1)
|
quantity = models.PositiveIntegerField(default=1, validators=[MaxValueValidator(MAX_QUANTITY)])
|
||||||
|
|
||||||
class Meta:
|
class Meta:
|
||||||
unique_together = ("reservation", "product")
|
unique_together = ("reservation", "product")
|
||||||
|
|
||||||
|
def clean(self):
|
||||||
|
from django.core.exceptions import ValidationError
|
||||||
|
if self.quantity is not None and self.quantity > MAX_QUANTITY:
|
||||||
|
raise ValidationError(f'La cantidad no puede exceder {MAX_QUANTITY} unidades.')
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return f"{self.quantity}x {self.product.name} (reserva {self.reservation_id})"
|
return f"{self.quantity}x {self.product.name} (reserva {self.reservation_id})"
|
||||||
|
|
||||||
@@ -190,7 +198,7 @@ class Cart(models.Model):
|
|||||||
class CartItem(models.Model):
|
class CartItem(models.Model):
|
||||||
cart = models.ForeignKey(Cart, on_delete=models.CASCADE, related_name='items')
|
cart = models.ForeignKey(Cart, on_delete=models.CASCADE, related_name='items')
|
||||||
product = models.ForeignKey(Product, on_delete=models.CASCADE)
|
product = models.ForeignKey(Product, on_delete=models.CASCADE)
|
||||||
quantity = models.PositiveIntegerField(default=1)
|
quantity = models.PositiveIntegerField(default=1, validators=[MaxValueValidator(MAX_QUANTITY)])
|
||||||
added_at = models.DateTimeField(auto_now_add=True)
|
added_at = models.DateTimeField(auto_now_add=True)
|
||||||
|
|
||||||
class Meta:
|
class Meta:
|
||||||
@@ -265,7 +273,7 @@ class OrderItem(models.Model):
|
|||||||
product = models.ForeignKey(Product, on_delete=models.SET_NULL, null=True, blank=True)
|
product = models.ForeignKey(Product, on_delete=models.SET_NULL, null=True, blank=True)
|
||||||
product_name = models.CharField(max_length=200, default="")
|
product_name = models.CharField(max_length=200, default="")
|
||||||
seller = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True, related_name='order_items_to_fulfill')
|
seller = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True, related_name='order_items_to_fulfill')
|
||||||
quantity = models.PositiveIntegerField(default=1)
|
quantity = models.PositiveIntegerField(default=1, validators=[MaxValueValidator(MAX_QUANTITY)])
|
||||||
unit_price = models.FloatField(default=0)
|
unit_price = models.FloatField(default=0)
|
||||||
total_price = models.FloatField(default=0)
|
total_price = models.FloatField(default=0)
|
||||||
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default=STATUS_PENDING)
|
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default=STATUS_PENDING)
|
||||||
|
|||||||
+18
-7
@@ -43,6 +43,17 @@ STOCK_RESERVATION_SESSION_KEY = "stock_reservation_id"
|
|||||||
STOCK_RESERVATION_PAYMENT_SESSION_KEY = "stock_reservation_payment_method"
|
STOCK_RESERVATION_PAYMENT_SESSION_KEY = "stock_reservation_payment_method"
|
||||||
|
|
||||||
|
|
||||||
|
def _mask_email(email: str) -> str:
|
||||||
|
if not email or '@' not in email:
|
||||||
|
return "***"
|
||||||
|
local, domain = email.rsplit('@', 1)
|
||||||
|
if len(local) <= 2:
|
||||||
|
masked_local = local[0] + '*'
|
||||||
|
else:
|
||||||
|
masked_local = local[0] + '*' * (len(local) - 2) + local[-1]
|
||||||
|
return f"{masked_local}@{domain}"
|
||||||
|
|
||||||
|
|
||||||
def _invalidate_product_cache(product_ids):
|
def _invalidate_product_cache(product_ids):
|
||||||
unique_product_ids = {product_id for product_id in product_ids if product_id is not None}
|
unique_product_ids = {product_id for product_id in product_ids if product_id is not None}
|
||||||
if not unique_product_ids:
|
if not unique_product_ids:
|
||||||
@@ -235,7 +246,7 @@ def login(request: HttpRequest):
|
|||||||
user: User = User.objects.get(email=email)
|
user: User = User.objects.get(email=email)
|
||||||
username = user.username
|
username = user.username
|
||||||
except User.DoesNotExist:
|
except User.DoesNotExist:
|
||||||
audit_logger.warning("LOGIN FAILED email=%s reason=user_not_found ip=%s", email, client_ip)
|
audit_logger.warning("LOGIN FAILED email=%s reason=user_not_found ip=%s", _mask_email(email), client_ip)
|
||||||
messages.error(request, "El email o la contraseña es incorrecta")
|
messages.error(request, "El email o la contraseña es incorrecta")
|
||||||
return render(request, "tienda/login.html", {"form": form})
|
return render(request, "tienda/login.html", {"form": form})
|
||||||
if user.registration_status == User.RegisterStatus.BANNED:
|
if user.registration_status == User.RegisterStatus.BANNED:
|
||||||
@@ -254,7 +265,7 @@ def login(request: HttpRequest):
|
|||||||
logins = int(data)
|
logins = int(data)
|
||||||
|
|
||||||
if logins >= 5:
|
if logins >= 5:
|
||||||
audit_logger.info("LOGIN FAILED email=%s reason=rate_limited", email)
|
audit_logger.info("LOGIN FAILED email=%s reason=rate_limited", _mask_email(email))
|
||||||
messages.error(request, "Has sufrido de Rate Limit por fallar 5 veces la contraseña")
|
messages.error(request, "Has sufrido de Rate Limit por fallar 5 veces la contraseña")
|
||||||
return render(request, "tienda/login.html", {"form": form})
|
return render(request, "tienda/login.html", {"form": form})
|
||||||
logins+=1
|
logins+=1
|
||||||
@@ -262,7 +273,7 @@ def login(request: HttpRequest):
|
|||||||
messages.error(request, "El email o la contraseña es incorrecta")
|
messages.error(request, "El email o la contraseña es incorrecta")
|
||||||
return render(request, "tienda/login.html", {"form": form})
|
return render(request, "tienda/login.html", {"form": form})
|
||||||
if user.registration_status == User.RegisterStatus.CONFIRMATION_REQUIRED:
|
if user.registration_status == User.RegisterStatus.CONFIRMATION_REQUIRED:
|
||||||
audit_logger.info("LOGIN_FAILED email=%s reason=not_verified", email)
|
audit_logger.info("LOGIN_FAILED email=%s reason=not_verified", _mask_email(email))
|
||||||
messages.error(request, "No se puede iniciar sesión porque no has verificado tu cuenta, comprueba tu email. Si eliminaste el email pero querias verificarte, contacta con el soporte tecnico")
|
messages.error(request, "No se puede iniciar sesión porque no has verificado tu cuenta, comprueba tu email. Si eliminaste el email pero querias verificarte, contacta con el soporte tecnico")
|
||||||
return render(request, "tienda/login.html", {"form": form})
|
return render(request, "tienda/login.html", {"form": form})
|
||||||
auth_login(request, user)
|
auth_login(request, user)
|
||||||
@@ -272,7 +283,7 @@ def login(request: HttpRequest):
|
|||||||
else:
|
else:
|
||||||
request.session.set_expiry(1209600)
|
request.session.set_expiry(1209600)
|
||||||
|
|
||||||
audit_logger.info("LOGIN_SUCCESS user_id=%s email=%s ip=%s remember=%s", user.id, user.email, client_ip, bool(remember))
|
audit_logger.info("LOGIN_SUCCESS user_id=%s email=%s ip=%s remember=%s", user.id, _mask_email(user.email), client_ip, bool(remember))
|
||||||
tasks.enviar_correo_bienvenida.delay(user.email, f"{user.first_name} {user.last_name}")
|
tasks.enviar_correo_bienvenida.delay(user.email, f"{user.first_name} {user.last_name}")
|
||||||
messages.success(request, f"¡Bienvenido {user.first_name or user.username}!")
|
messages.success(request, f"¡Bienvenido {user.first_name or user.username}!")
|
||||||
return redirect("index")
|
return redirect("index")
|
||||||
@@ -332,7 +343,7 @@ def register(request: HttpRequest):
|
|||||||
|
|
||||||
# Validación email
|
# Validación email
|
||||||
if User.objects.filter(email=email).exists():
|
if User.objects.filter(email=email).exists():
|
||||||
audit_logger.warning("REGISTER_FAILED email=%s reason=email_exists ip=%s", email, client_ip)
|
audit_logger.warning("REGISTER_FAILED email=%s reason=email_exists ip=%s", _mask_email(email), client_ip)
|
||||||
messages.error(request, "Ya existe un usuario con este correo electrónico")
|
messages.error(request, "Ya existe un usuario con este correo electrónico")
|
||||||
return render(request, "tienda/register.html", {"form":form})
|
return render(request, "tienda/register.html", {"form":form})
|
||||||
|
|
||||||
@@ -352,7 +363,7 @@ def register(request: HttpRequest):
|
|||||||
"REGISTER_SUCCESS user_id=%s username=%s email=%s ip=%s",
|
"REGISTER_SUCCESS user_id=%s username=%s email=%s ip=%s",
|
||||||
user.id,
|
user.id,
|
||||||
user.username,
|
user.username,
|
||||||
user.email,
|
_mask_email(user.email),
|
||||||
client_ip,
|
client_ip,
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -370,7 +381,7 @@ def logout(request: HttpRequest):
|
|||||||
email = request.user.email if request.user.is_authenticated else None
|
email = request.user.email if request.user.is_authenticated else None
|
||||||
client_ip = _get_client_ip(request)
|
client_ip = _get_client_ip(request)
|
||||||
auth_logout(request)
|
auth_logout(request)
|
||||||
audit_logger.info("LOGOUT user_id=%s email=%s ip=%s", user_id, email, client_ip)
|
audit_logger.info("LOGOUT user_id=%s email=%s ip=%s", user_id, _mask_email(email) if email else "***", client_ip)
|
||||||
messages.success(request, "Has cerrado sesión exitosamente.")
|
messages.success(request, "Has cerrado sesión exitosamente.")
|
||||||
return redirect("index")
|
return redirect("index")
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user