from django import forms from django.contrib import admin, messages from django.core.exceptions import ValidationError from django.forms import BaseInlineFormSet from django.shortcuts import redirect from django.urls import path, reverse from django.utils import timezone from .intf_sqlserver import listar_para_selecionar_colaborador from .intf_winthor import buscar_colaborador_oracle from .models import ( ConfiguracaoSGMP, HeadGestor, PessoaRM, Solicitacao, UsuarioPerfilExtra, UsuarioSistema, ) class ConfiguracaoSGMPForm(forms.ModelForm): perfis_gerenciar_permissoes = forms.MultipleChoiceField( label="Perfis com acesso à tela Gerenciar Permissões (/permissoes/)", choices=UsuarioSistema.Perfil.choices, widget=forms.CheckboxSelectMultiple, required=True, ) class Meta: model = ConfiguracaoSGMP fields = ("perfis_gerenciar_permissoes",) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) if self.instance.pk: v = self.instance.perfis_gerenciar_permissoes if v is not None: self.initial["perfis_gerenciar_permissoes"] = list(v) else: self.initial.setdefault( "perfis_gerenciar_permissoes", [UsuarioSistema.Perfil.ADMIN], ) def clean_perfis_gerenciar_permissoes(self): data = self.cleaned_data.get("perfis_gerenciar_permissoes") or [] if not data: raise ValidationError("Selecione ao menos um perfil.") valid = {c[0] for c in UsuarioSistema.Perfil.choices} cleaned = [x for x in data if x in valid] if not cleaned: raise ValidationError("Nenhum código de perfil válido selecionado.") return cleaned @admin.register(ConfiguracaoSGMP) class ConfiguracaoSGMPAdmin(admin.ModelAdmin): form = ConfiguracaoSGMPForm list_display = ("__str__",) def has_add_permission(self, request): return not ConfiguracaoSGMP.objects.filter(pk=1).exists() def has_delete_permission(self, request, obj=None): return False def changelist_view(self, request, extra_context=None): if ConfiguracaoSGMP.objects.filter(pk=1).exists(): return redirect(reverse("admin:solicitacoes_configuracaosgmp_change", args=(1,))) return super().changelist_view(request, extra_context) @admin.register(PessoaRM) class PessoaRMAdmin(admin.ModelAdmin): list_display = ( "nome", "matricula", "cargo", "setor", "centro_custo", "matricula_winthor", "sincronizado_em", ) search_fields = ( "nome", "matricula", "id_rm", "matricula_winthor", ) list_filter = ( "setor", "cargo", ) readonly_fields = ( "id_rm", "sincronizado_em", "criado_em", "atualizado_em", ) # Diz ao Django para usar um template customizado change_list_template = "admin/solicitacoes/pessoarm/pessoarm_changelist.html" def get_urls(self): urls = super().get_urls() custom_urls = [ path( "sync-rm/", self.admin_site.admin_view(self.sync_rm), name="pessoarm-sync-rm", ), ] return custom_urls + urls def sync_rm(self, request): dados_rm = listar_para_selecionar_colaborador() criados = 0 atualizados = 0 winthor_ok = 0 winthor_erro = 0 winthor_sem_cpf = 0 agora = timezone.now() for row in dados_rm: id_rm = f"{row['CODCOLIGADA']}-{row['CHAPA']}" matricula_winthor = None # ========================= # Integração Winthor (CPF) # ========================= cpf = row.get("CPF") if cpf: try: dados_w = buscar_colaborador_oracle(cpf) if dados_w and dados_w.get("matricula"): matricula_winthor = dados_w["matricula"] winthor_ok += 1 else: winthor_erro += 1 except Exception: winthor_erro += 1 else: winthor_sem_cpf += 1 # ========================= # Sync RM (sempre executa) # ========================= _, created = PessoaRM.objects.update_or_create( id_rm=id_rm, defaults={ "matricula": row["CHAPA"], "nome": row["NOME"], "cargo": row["FUNCAO"], "setor": row["SECAO"], "centro_custo": row["CODSECAO"], "cpf": row["CPF"], "data_admissao": row["DATAADMISSAO"], "situacao": row["CODSITUACAO"], "cod_funcao": row["CODFUNCAO"], "salario": row["SALARIO"], "cod_sindicato": row["CODSINDICATO"], "saldo_banco_horas_minutos": row["SALDO_MINUTOS"], "inicio_periodo_banco_horas": row["INICIOPER"], "fim_periodo_banco_horas": row["FIMPER"], "matricula_winthor": matricula_winthor, "sincronizado_em": agora, }, ) if created: criados += 1 else: atualizados += 1 # ========================= # Mensagens finais # ========================= messages.success( request, ( "Sincronização RM concluída com sucesso. " f"Criados: {criados} | Atualizados: {atualizados}" ), ) if winthor_ok: messages.info( request, f"Winthor: {winthor_ok} matrícula(s) sincronizada(s) com sucesso." ) if winthor_sem_cpf: messages.warning( request, f"Winthor: {winthor_sem_cpf} colaborador(es) sem CPF — ignorados." ) if winthor_erro: messages.warning( request, f"Winthor: {winthor_erro} falha(s) ao buscar matrícula." ) return redirect("..") class UsuarioPerfilExtraInlineFormSet(BaseInlineFormSet): def clean(self): super().clean() vistos = set() principal = self.data.get("perfil") or getattr(self.instance, "perfil", None) for form in self.forms: if not hasattr(form, "cleaned_data"): continue if form.cleaned_data.get("DELETE"): continue perfil_extra = form.cleaned_data.get("perfil") if not perfil_extra: continue if perfil_extra == principal: raise ValidationError( "Perfil extra não pode ser igual ao perfil principal." ) if perfil_extra in vistos: raise ValidationError(f"Perfil extra duplicado: {perfil_extra}.") vistos.add(perfil_extra) class UsuarioPerfilExtraInline(admin.TabularInline): model = UsuarioPerfilExtra formset = UsuarioPerfilExtraInlineFormSet extra = 0 fields = ("perfil", "criado_em") readonly_fields = ("criado_em",) @admin.register(UsuarioSistema) class UsuarioSistemaAdmin(admin.ModelAdmin): list_display = ( "nome", "matricula", "perfil", "perfis_ativos_display", "ativo", "criado_em", ) list_filter = ( "perfil", "ativo", ) search_fields = ( "nome", "matricula", ) readonly_fields = ( "criado_em", "atualizado_em", ) inlines = (UsuarioPerfilExtraInline,) fieldsets = ( ("Informações Básicas", { "fields": ("matricula", "nome", "ativo") }), ("Permissões", { "fields": ("perfil",) }), ("Auditoria", { "fields": ("criado_em", "atualizado_em"), "classes": ("collapse",) }), ) def get_queryset(self, request): qs = super().get_queryset(request) return qs.prefetch_related("perfis_extras") @admin.display(description="Perfis ativos") def perfis_ativos_display(self, obj): codigos = obj.perfis_ativos() labels = dict(UsuarioSistema.Perfil.choices) resolved = [str(labels.get(c, c)) for c in codigos] return ", ".join(resolved) @admin.register(UsuarioPerfilExtra) class UsuarioPerfilExtraAdmin(admin.ModelAdmin): list_display = ("usuario", "perfil", "criado_em") list_filter = ("perfil",) search_fields = ("usuario__nome", "usuario__matricula") autocomplete_fields = ("usuario",) @admin.register(HeadGestor) class HeadGestorAdmin(admin.ModelAdmin): list_display = ("head", "gestor") list_filter = ("head",) search_fields = ("head__nome", "gestor__nome", "head__matricula", "gestor__matricula") autocomplete_fields = ("head", "gestor") @admin.register(Solicitacao) class SolicitacaoAdmin(admin.ModelAdmin): list_display = [ "id", "tipo", "status", "funcionario", "solicitante", "criado_em", ] list_filter = ("tipo", "status", "funcionario", "solicitante") search_fields = ("id", "tipo", "status", "funcionario", "solicitante") readonly_fields = ("id", "criado_em", "atualizado_em") fieldsets = ( ("Informações Básicas", { "fields": ("id", "tipo", "status", "funcionario", "solicitante") }), ("Auditoria", { "fields": ("criado_em", "atualizado_em"), "classes": ("collapse",) }), )