sgmp/solicitacoes/admin.py

339 lines
9.8 KiB
Python

from django import forms
from django.contrib import admin, messages
from django.core.exceptions import ValidationError
from django.forms import BaseInlineFormSet
from django.shortcuts import redirect
from django.urls import path, reverse
from django.utils import timezone
from .intf_sqlserver import listar_para_selecionar_colaborador
from .intf_winthor import buscar_colaborador_oracle
from .models import (
ConfiguracaoSGMP,
HeadGestor,
PessoaRM,
Solicitacao,
UsuarioPerfilExtra,
UsuarioSistema,
)
class ConfiguracaoSGMPForm(forms.ModelForm):
perfis_gerenciar_permissoes = forms.MultipleChoiceField(
label="Perfis com acesso à tela Gerenciar Permissões (/permissoes/)",
choices=UsuarioSistema.Perfil.choices,
widget=forms.CheckboxSelectMultiple,
required=True,
)
class Meta:
model = ConfiguracaoSGMP
fields = ("perfis_gerenciar_permissoes",)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self.instance.pk:
v = self.instance.perfis_gerenciar_permissoes
if v is not None:
self.initial["perfis_gerenciar_permissoes"] = list(v)
else:
self.initial.setdefault(
"perfis_gerenciar_permissoes",
[UsuarioSistema.Perfil.ADMIN],
)
def clean_perfis_gerenciar_permissoes(self):
data = self.cleaned_data.get("perfis_gerenciar_permissoes") or []
if not data:
raise ValidationError("Selecione ao menos um perfil.")
valid = {c[0] for c in UsuarioSistema.Perfil.choices}
cleaned = [x for x in data if x in valid]
if not cleaned:
raise ValidationError("Nenhum código de perfil válido selecionado.")
return cleaned
@admin.register(ConfiguracaoSGMP)
class ConfiguracaoSGMPAdmin(admin.ModelAdmin):
form = ConfiguracaoSGMPForm
list_display = ("__str__",)
def has_add_permission(self, request):
return not ConfiguracaoSGMP.objects.filter(pk=1).exists()
def has_delete_permission(self, request, obj=None):
return False
def changelist_view(self, request, extra_context=None):
if ConfiguracaoSGMP.objects.filter(pk=1).exists():
return redirect(reverse("admin:solicitacoes_configuracaosgmp_change", args=(1,)))
return super().changelist_view(request, extra_context)
@admin.register(PessoaRM)
class PessoaRMAdmin(admin.ModelAdmin):
list_display = (
"nome",
"matricula",
"cargo",
"setor",
"centro_custo",
"matricula_winthor",
"sincronizado_em",
)
search_fields = (
"nome",
"matricula",
"id_rm",
"matricula_winthor",
)
list_filter = (
"setor",
"cargo",
)
readonly_fields = (
"id_rm",
"sincronizado_em",
"criado_em",
"atualizado_em",
)
# Diz ao Django para usar um template customizado
change_list_template = "admin/solicitacoes/pessoarm/pessoarm_changelist.html"
def get_urls(self):
urls = super().get_urls()
custom_urls = [
path(
"sync-rm/",
self.admin_site.admin_view(self.sync_rm),
name="pessoarm-sync-rm",
),
]
return custom_urls + urls
def sync_rm(self, request):
dados_rm = listar_para_selecionar_colaborador()
criados = 0
atualizados = 0
winthor_ok = 0
winthor_erro = 0
winthor_sem_cpf = 0
agora = timezone.now()
for row in dados_rm:
id_rm = f"{row['CODCOLIGADA']}-{row['CHAPA']}"
matricula_winthor = None
# =========================
# Integração Winthor (CPF)
# =========================
cpf = row.get("CPF")
if cpf:
try:
dados_w = buscar_colaborador_oracle(cpf)
if dados_w and dados_w.get("matricula"):
matricula_winthor = dados_w["matricula"]
winthor_ok += 1
else:
winthor_erro += 1
except Exception:
winthor_erro += 1
else:
winthor_sem_cpf += 1
# =========================
# Sync RM (sempre executa)
# =========================
_, created = PessoaRM.objects.update_or_create(
id_rm=id_rm,
defaults={
"matricula": row["CHAPA"],
"nome": row["NOME"],
"cargo": row["FUNCAO"],
"setor": row["SECAO"],
"centro_custo": row["CODSECAO"],
"cpf": row["CPF"],
"data_admissao": row["DATAADMISSAO"],
"situacao": row["CODSITUACAO"],
"cod_funcao": row["CODFUNCAO"],
"salario": row["SALARIO"],
"cod_sindicato": row["CODSINDICATO"],
"saldo_banco_horas_minutos": row["SALDO_MINUTOS"],
"inicio_periodo_banco_horas": row["INICIOPER"],
"fim_periodo_banco_horas": row["FIMPER"],
"matricula_winthor": matricula_winthor,
"sincronizado_em": agora,
},
)
if created:
criados += 1
else:
atualizados += 1
# =========================
# Mensagens finais
# =========================
messages.success(
request,
(
"Sincronização RM concluída com sucesso. "
f"Criados: {criados} | Atualizados: {atualizados}"
),
)
if winthor_ok:
messages.info(
request,
f"Winthor: {winthor_ok} matrícula(s) sincronizada(s) com sucesso."
)
if winthor_sem_cpf:
messages.warning(
request,
f"Winthor: {winthor_sem_cpf} colaborador(es) sem CPF — ignorados."
)
if winthor_erro:
messages.warning(
request,
f"Winthor: {winthor_erro} falha(s) ao buscar matrícula."
)
return redirect("..")
class UsuarioPerfilExtraInlineFormSet(BaseInlineFormSet):
def clean(self):
super().clean()
vistos = set()
principal = self.data.get("perfil") or getattr(self.instance, "perfil", None)
for form in self.forms:
if not hasattr(form, "cleaned_data"):
continue
if form.cleaned_data.get("DELETE"):
continue
perfil_extra = form.cleaned_data.get("perfil")
if not perfil_extra:
continue
if perfil_extra == principal:
raise ValidationError(
"Perfil extra não pode ser igual ao perfil principal."
)
if perfil_extra in vistos:
raise ValidationError(f"Perfil extra duplicado: {perfil_extra}.")
vistos.add(perfil_extra)
class UsuarioPerfilExtraInline(admin.TabularInline):
model = UsuarioPerfilExtra
formset = UsuarioPerfilExtraInlineFormSet
extra = 0
fields = ("perfil", "criado_em")
readonly_fields = ("criado_em",)
@admin.register(UsuarioSistema)
class UsuarioSistemaAdmin(admin.ModelAdmin):
list_display = (
"nome",
"matricula",
"perfil",
"perfis_ativos_display",
"ativo",
"criado_em",
)
list_filter = (
"perfil",
"ativo",
)
search_fields = (
"nome",
"matricula",
)
readonly_fields = (
"criado_em",
"atualizado_em",
)
inlines = (UsuarioPerfilExtraInline,)
fieldsets = (
("Informações Básicas", {
"fields": ("matricula", "nome", "ativo")
}),
("Permissões", {
"fields": ("perfil",)
}),
("Auditoria", {
"fields": ("criado_em", "atualizado_em"),
"classes": ("collapse",)
}),
)
def get_queryset(self, request):
qs = super().get_queryset(request)
return qs.prefetch_related("perfis_extras")
@admin.display(description="Perfis ativos")
def perfis_ativos_display(self, obj):
codigos = obj.perfis_ativos()
labels = dict(UsuarioSistema.Perfil.choices)
resolved = [str(labels.get(c, c)) for c in codigos]
return ", ".join(resolved)
@admin.register(UsuarioPerfilExtra)
class UsuarioPerfilExtraAdmin(admin.ModelAdmin):
list_display = ("usuario", "perfil", "criado_em")
list_filter = ("perfil",)
search_fields = ("usuario__nome", "usuario__matricula")
autocomplete_fields = ("usuario",)
@admin.register(HeadGestor)
class HeadGestorAdmin(admin.ModelAdmin):
list_display = ("head", "gestor")
list_filter = ("head",)
search_fields = ("head__nome", "gestor__nome", "head__matricula", "gestor__matricula")
autocomplete_fields = ("head", "gestor")
@admin.register(Solicitacao)
class SolicitacaoAdmin(admin.ModelAdmin):
list_display = [
"id",
"tipo",
"status",
"funcionario",
"solicitante",
"criado_em",
]
list_filter = ("tipo", "status", "funcionario", "solicitante")
search_fields = ("id", "tipo", "status", "funcionario", "solicitante")
readonly_fields = ("id", "criado_em", "atualizado_em")
fieldsets = (
("Informações Básicas", {
"fields": ("id", "tipo", "status", "funcionario", "solicitante")
}),
("Auditoria", {
"fields": ("criado_em", "atualizado_em"),
"classes": ("collapse",)
}),
)