from django.contrib.auth import authenticate, login, logout from django.core.exceptions import ValidationError as DjangoValidationError from django.db import IntegrityError, transaction from django.db.models import Q from rest_framework.decorators import action from rest_framework.exceptions import NotFound, PermissionDenied, ValidationError from rest_framework.permissions import AllowAny, IsAuthenticated, IsAuthenticatedOrReadOnly from rest_framework.response import Response from rest_framework.status import HTTP_204_NO_CONTENT, HTTP_400_BAD_REQUEST, HTTP_401_UNAUTHORIZED, HTTP_403_FORBIDDEN, HTTP_404_NOT_FOUND, HTTP_200_OK, HTTP_201_CREATED from rest_framework.viewsets import ModelViewSet, ReadOnlyModelViewSet from apps.accounts.models import Invite, Organization, Role, User from apps.accounts.permissions import CanManageOrganization, IsOrganizationOwnerOrMember, can_manage_organization from apps.accounts.serializers import InviteSerializer, OrganizationSerializer, RoleSerializer, UserSerializer class UserViewSet(ReadOnlyModelViewSet): queryset = User.objects.all() serializer_class = UserSerializer permission_classes = [IsAuthenticatedOrReadOnly] lookup_field = 'uuid' @action(detail=False, methods=['post'], permission_classes=[AllowAny]) def login(self, request): email_address = request.data.get('email_address') password = request.data.get('password') if not email_address or not password: return Response({'error': 'Email and password are required'}, status=HTTP_400_BAD_REQUEST) email_address = User.objects.normalize_email(email_address) user = authenticate(request, username=email_address, password=password) if user is None: return Response({'error': 'Invalid credentials'}, status=HTTP_401_UNAUTHORIZED) login(request, user) return Response({'user': UserSerializer(user).data, 'message': 'Login successful', 'success': True}, status=HTTP_200_OK) @action(detail=False, methods=['post'], permission_classes=[IsAuthenticated]) def logout(self, request): logout(request) return Response({'message': 'Logout successful', 'success': True}, status=HTTP_200_OK) @action(detail=False, methods=['get'], permission_classes=[IsAuthenticated]) def me(self, request): user_data = UserSerializer(request.user).data user_data['success'] = True return Response(user_data) @action(detail=False, methods=['get'], permission_classes=[AllowAny]) def session(self, request): return Response({'isAuthenticated': request.user.is_authenticated, 'isStaff': request.user.is_staff if request.user.is_authenticated else False}) @action(detail=False, methods=['post'], permission_classes=[AllowAny]) def signup(self, request): data = request.data email_address = data.get('email_address') if not email_address: return Response({'detail': 'Email address is required.', 'success': False}, status=HTTP_400_BAD_REQUEST) email_address = User.objects.normalize_email(email_address) if User.objects.filter(email_address=email_address).exists(): return Response({'detail': 'Email address already exists.', 'success': False}, status=HTTP_400_BAD_REQUEST) if not data.get('first_name') or not data.get('last_name'): return Response({'detail': 'First and last name(s) must be provided.', 'success': False}, status=HTTP_400_BAD_REQUEST) if type(manager:=data.get('manager')) is not bool: if manager in ['true', 'True']: manager = True elif manager in ['false', 'False']: manager = False else: return Response({'detail': '"manager" field must be a boolean value.', 'success': False}, status=HTTP_400_BAD_REQUEST) if data.get('password') != data.get('confirm_password'): return Response({'detail': 'Passwords do not match.', 'success': False}, status=HTTP_400_BAD_REQUEST) try: user = User.objects.create_user( email_address=email_address, password=data.get('password'), first_name=data.get('first_name'), last_name=data.get('last_name'), date_of_birth=data.get('date_of_birth'), is_manager=manager, ) return Response({'detail': 'User account created successfully.', 'success': True}, status=HTTP_201_CREATED) except (ValueError, TypeError, DjangoValidationError, IntegrityError) as e: return Response({'detail': str(e), 'success': False}, status=HTTP_400_BAD_REQUEST) @action(detail=False, methods=['post'], permission_classes=[IsAuthenticated]) def change_password(self, request): data = request.data required_fields = ['old_password', 'password', 'confirm_password'] for field in required_fields: if not data.get(field): return Response({'detail': f'"{field}" not provided', 'success': False}, status=HTTP_400_BAD_REQUEST) if data.get('password') != data.get('confirm_password'): return Response({'detail': 'Passwords do not match', 'success': False}, status=HTTP_400_BAD_REQUEST) user = request.user if not user.check_password(data.get('old_password')): return Response({'detail': 'Old password is incorrect', 'success': False}, status=HTTP_401_UNAUTHORIZED) user.set_password(data.get('password')) user.save() return Response({'detail': 'Password changed successfully', 'success': True}, status=HTTP_200_OK) class OrganizationViewSet(ModelViewSet): queryset = Organization.objects.all() serializer_class = OrganizationSerializer permission_classes = [IsAuthenticated] lookup_field = 'uuid' def get_permissions(self): permissions = super().get_permissions() if self.action in ['retrieve', 'update', 'partial_update', 'destroy', 'leave', 'list_members', 'remove_member']: return [*permissions, IsOrganizationOwnerOrMember()] return permissions def get_queryset(self): return Organization.objects.filter( Q(owner=self.request.user) | Q(members=self.request.user) ).distinct() def perform_create(self, serializer): organization = serializer.save(owner=self.request.user) organization.members.add(self.request.user) def update(self, request, *args, **kwargs): return super().update(request, *args, **kwargs) def destroy(self, request, *args, **kwargs): return super().destroy(request, *args, **kwargs) @action(detail=True, methods=['post'], url_path='leave') def leave(self, request, uuid=None): organization: Organization = self.get_object() if organization.owner == request.user: other_members = organization.members.exclude(uuid=request.user.uuid) if other_members.exists(): return Response( { 'error': ( 'Owner cannot leave while other members/managers exist. ' 'Remove all other members first.' ) }, status=HTTP_400_BAD_REQUEST, ) organization.delete() return Response({'message': 'Organization deleted. Owner has left successfully.'}, status=HTTP_200_OK) if not organization.members.filter(uuid=request.user.uuid).exists(): return Response({'error': 'Not a member'}, status=HTTP_400_BAD_REQUEST) roles = Role.objects.filter(organization=organization, members=request.user) for role in roles: role.members.remove(request.user) organization.members.remove(request.user) return Response({'message': 'Left organization'}, status=HTTP_200_OK) @action(detail=True, methods=['get'], url_path='members') def list_members(self, request, uuid=None): organization = self.get_object() serializer = UserSerializer(organization.members.all(), many=True) return Response(serializer.data) @action(detail=True, methods=['post'], url_path=r'member/(?P[0-9a-f-]{36})/remove') def remove_member(self, request, uuid=None, user_uuid=None): organization: Organization = self.get_object() if str(organization.owner.uuid) == str(user_uuid): return Response({'error': 'Cannot remove owner'}, status=HTTP_403_FORBIDDEN) user_to_remove = organization.members.filter(uuid=user_uuid).first() if not user_to_remove: return Response({'error': 'Not found'}, status=HTTP_404_NOT_FOUND) organization.members.remove(user_to_remove) return Response({'message': 'Removed'}, status=HTTP_200_OK) class InviteViewSet(ModelViewSet): queryset = Invite.objects.all() serializer_class = InviteSerializer permission_classes = [IsAuthenticated] lookup_field = 'uuid' def _get_organization_uuid(self, request): organization_uuid = request.query_params.get('organization_uuid') if organization_uuid in (None, ''): organization_uuid = request.data.get('organization_uuid') return organization_uuid def get_permissions(self): permissions = super().get_permissions() if self.action in ['destroy']: return [*permissions, CanManageOrganization()] return permissions def get_queryset(self): user = self.request.user queryset = Invite.objects.filter( Q(organization__owner=user) | Q(organization__members=user) ).distinct().order_by('-created_at') organization_uuid = self._get_organization_uuid(self.request) if organization_uuid: queryset = queryset.filter(organization__uuid=organization_uuid) return queryset def create(self, request, *args, **kwargs): organization_uuid = self._get_organization_uuid(request) if not organization_uuid: raise ValidationError({'organization_uuid': 'organization_uuid is required'}) organization = Organization.objects.filter(uuid=organization_uuid).filter( Q(owner=request.user) | Q(members=request.user) ).first() if not organization: raise NotFound('Organization not found') if not can_manage_organization(request.user, organization): raise PermissionDenied('Only organization owner or managers can create invites') max_uses = request.query_params.get('max_uses') or request.data.get('max_uses', 1) try: max_uses = int(max_uses) except (TypeError, ValueError): raise ValidationError({'max_uses': 'max_uses must be an integer'}) if max_uses < 1 or max_uses > 1000: raise ValidationError({'max_uses': 'max_uses must be between 1 and 1000'}) invitation = Invite.objects.create( organization=organization, created_by=request.user, max_uses=max_uses, ) serializer = self.get_serializer(invitation) return Response(serializer.data, status=HTTP_201_CREATED) def destroy(self, request, *args, **kwargs): invite = self.get_object() invite.is_active = False invite.save(update_fields=['is_active', 'updated_at']) return Response({'message': 'Invitation successfully revoked'}, status=HTTP_200_OK) @action(detail=False, methods=['post'], url_path='join') def join(self, request): invite_uuid = request.query_params.get('invite_uuid') or request.data.get('invite_uuid') if not invite_uuid: return Response({'error': 'invite_uuid is required'}, status=HTTP_400_BAD_REQUEST) with transaction.atomic(): try: invitation = Invite.objects.select_for_update().select_related('organization').get(uuid=invite_uuid) except Invite.DoesNotExist: return Response({'error': 'Not Found'}, status=HTTP_404_NOT_FOUND) if not invitation.is_valid(): return Response({'error': 'Invalid or expired invitation'}, status=HTTP_400_BAD_REQUEST) organization = invitation.organization if organization.members.filter(uuid=request.user.uuid).exists(): return Response({'error': 'Already a member'}, status=HTTP_403_FORBIDDEN) organization.members.add(request.user) invitation.uses += 1 if invitation.uses >= invitation.max_uses: invitation.is_active = False invitation.save(update_fields=['uses', 'is_active', 'updated_at']) return Response({ 'message': 'Joined', 'organization': OrganizationSerializer(organization).data, }, status=HTTP_200_OK) class RoleViewSet(ModelViewSet): queryset = Role.objects.all() serializer_class = RoleSerializer permission_classes = [IsAuthenticated] lookup_field = 'uuid' def _get_organization_uuid(self, request): organization_uuid = request.query_params.get('organization_uuid') if organization_uuid in (None, ''): organization_uuid = request.data.get('organization_uuid') return organization_uuid def get_permissions(self): permissions = super().get_permissions() if self.action in ['destroy']: return [*permissions, CanManageOrganization()] return permissions def get_queryset(self): user = self.request.user queryset = Role.objects.filter( Q(organization__owner=user) | Q(organization__members=user) ).distinct().order_by('name') organization_uuid = self._get_organization_uuid(self.request) if organization_uuid: queryset = queryset.filter(organization__uuid=organization_uuid) return queryset def create(self, request, *args, **kwargs): organization_uuid = self._get_organization_uuid(request) if not organization_uuid: raise ValidationError({'organization_uuid': 'organization_uuid is required'}) organization = Organization.objects.filter(uuid=organization_uuid).filter( Q(owner=request.user) | Q(members=request.user) ).first() if not organization: raise NotFound('Organization not found') if not can_manage_organization(request.user, organization): raise PermissionDenied('Only organization owner or managers can create roles') name = (request.data.get('name') or '').strip() description = (request.data.get('description') or '').strip() if not name: raise ValidationError({'name': 'Role name is required'}) if organization.roles.filter(name__iexact=name).exists(): raise ValidationError({'name': 'A role with this name already exists in this organization'}) role = Role.objects.create(name=name, description=description, organization=organization) serializer = self.get_serializer(role) return Response(serializer.data, status=HTTP_201_CREATED) def destroy(self, request, *args, **kwargs): role = self.get_object() role.delete() return Response(status=HTTP_204_NO_CONTENT) @action(detail=False, methods=['get'], url_path='mine') def mine(self, request): roles = Role.objects.filter(members=request.user).distinct() serializer = self.get_serializer(roles, many=True) return Response(serializer.data) @action(detail=True, methods=['post'], url_path='join') def join(self, request, uuid=None): role = self.get_object() organization = role.organization is_owner = organization.owner == request.user is_member = organization.members.filter(id=request.user.id).exists() if not (is_owner or is_member): raise PermissionDenied('Not a member of this organization') if role.members.filter(id=request.user.id).exists(): return Response({'message': 'Already a member of this role'}, status=HTTP_200_OK) role.members.add(request.user) return Response({'message': 'Joined role successfully'}, status=HTTP_200_OK)