#SGMP_PROD/solicitacoes/views.py import json import logging import time import uuid from datetime import date from io import BytesIO from urllib.parse import urlencode from django.contrib.auth import login, logout, get_user_model from django.contrib import messages from django.contrib.auth.decorators import login_required from django.shortcuts import get_object_or_404, redirect, render from django.http import HttpResponse from django.utils import timezone from django.core.paginator import Paginator from django.db.models import Q from django.template.loader import render_to_string from openpyxl import Workbook from .models import ( HeadGestor, PessoaRM, Solicitacao, UsuarioSistema, TipoSolicitacao, DecisaoAprovacao, StatusSolicitacao, EtapaAprovacao, matriculas_gestores_do_head, UsuarioPerfilExtra, ) from . import services from .acesso import requer_acesso_gerenciar_permissoes from .decorators import pode_criar_solicitacao, requer_perfil from solicitacoes.intf_winthor import autenticar_usuario, buscar_colaborador_oracle logger = logging.getLogger(__name__) def _qs_dashboard_unir_fila_com_minhas(qs_fila, usuario): """Fila do papel OR solicitações em que o usuário é o solicitante (distinct).""" qs_minhas = Solicitacao.objects.filter(solicitante=usuario) return (qs_fila | qs_minhas).distinct() def _queryset_dashboard_solicitacoes(usuario: UsuarioSistema): """ Queryset visível no dashboard para o usuário (antes de order_by/paginação). A seleção do ramo principal é baseada em ``usuario.perfil`` para preservar precedência do perfil principal. Perfis extras são considerados apenas como capacidades complementares dentro de cada ramo quando necessário. **Precedência** (primeiro ramo que se aplica; papéis operacionais antes de GESTOR): 1. ADMIN — todas as solicitações 2. GG ou CONTROLADORIA — fila ENVIADA (com excludes de parecer já dado) ∪ minhas 3. HEAD — fila AGUARDANDO_HEAD (gestores vinculados) ∪ minhas 4. DIRETORIA — fila AGUARDANDO_DIRETORIA ∪ minhas 5. GESTOR — apenas ``solicitante=usuario`` 6. Demais — vazio Total e pendentes no dashboard devem usar ``.count()`` sobre este mesmo queryset (pendentes = excluir FINALIZADA e REPROVADA). """ perfil_principal = usuario.perfil if perfil_principal == UsuarioSistema.Perfil.ADMIN: return Solicitacao.objects.all() if perfil_principal in ( UsuarioSistema.Perfil.GG, UsuarioSistema.Perfil.CONTROLADORIA, ): qs_fila = Solicitacao.objects.filter(status=StatusSolicitacao.ENVIADA) if perfil_principal == UsuarioSistema.Perfil.GG: qs_fila = qs_fila.exclude( pareceres__etapa=EtapaAprovacao.GG, pareceres__usuario=usuario ) elif perfil_principal == UsuarioSistema.Perfil.CONTROLADORIA: qs_fila = qs_fila.exclude( pareceres__etapa=EtapaAprovacao.CONTROLADORIA, pareceres__usuario=usuario, ) return _qs_dashboard_unir_fila_com_minhas(qs_fila, usuario) if perfil_principal == UsuarioSistema.Perfil.HEAD: matriculas = matriculas_gestores_do_head(usuario) qs_fila = Solicitacao.objects.filter(status=StatusSolicitacao.AGUARDANDO_HEAD) if matriculas: qs_fila = qs_fila.filter(solicitante__matricula__in=matriculas) return _qs_dashboard_unir_fila_com_minhas(qs_fila, usuario) if perfil_principal == UsuarioSistema.Perfil.DIRETORIA: qs_fila = Solicitacao.objects.filter(status=StatusSolicitacao.AGUARDANDO_DIRETORIA) if usuario.perfis_extras.filter(perfil=UsuarioSistema.Perfil.HEAD).exists(): qs_fila = qs_fila | Solicitacao.objects.filter(status=StatusSolicitacao.AGUARDANDO_HEAD) return _qs_dashboard_unir_fila_com_minhas(qs_fila, usuario) if perfil_principal == UsuarioSistema.Perfil.GESTOR: return Solicitacao.objects.filter(solicitante=usuario) return Solicitacao.objects.none() # #region agent log _DEBUG_LOG_PATH = "/home/f3lipe/dev/.cursor/debug-ca90b5.log" def _debug_log(hypothesis_id: str, location: str, message: str, data: dict) -> None: try: with open(_DEBUG_LOG_PATH, "a", encoding="utf-8") as f: f.write( json.dumps( { "sessionId": "ca90b5", "hypothesisId": hypothesis_id, "location": location, "message": message, "data": data, "timestamp": int(time.time() * 1000), }, ensure_ascii=False, ) + "\n" ) except Exception: pass # #endregion def get_usuario_sistema(request) -> UsuarioSistema: """ Resolve o usuário autenticado do Django para o UsuarioSistema do SGMP. """ return get_object_or_404( UsuarioSistema, matricula=request.user.username, ativo=True ) @login_required @pode_criar_solicitacao def criar_desligamento(request, pessoa_id): from .intf_sqlserver import verificar_estabilidades_colaborador funcionario = get_object_or_404(PessoaRM, id=pessoa_id) usuario = get_usuario_sistema(request) if request.method == "POST": tipo_desligamento = request.POST.get("tipo_desligamento", "").strip() aviso_previo = request.POST.get("aviso_previo", "").strip() motivo = request.POST.get("motivo", "").strip() data_prevista_str = request.POST.get("data_prevista_desligamento", "").strip() observacoes = request.POST.get("observacoes", "") or "" arquivo_pedido = request.FILES.get("arquivo_pedido") if not tipo_desligamento or not aviso_previo or not motivo or not data_prevista_str: messages.error(request, "Preencha todos os campos obrigatórios.") else: try: data_prevista = date.fromisoformat(data_prevista_str) except ValueError: messages.error(request, "Data prevista inválida. Use o formato AAAA-MM-DD.") else: try: solicitacao = services.criar_solicitacao_desligamento( solicitante=usuario, funcionario=funcionario, tipo_desligamento=tipo_desligamento, aviso_previo=aviso_previo, motivo=motivo, data_prevista_desligamento=data_prevista, observacoes=observacoes, arquivo_pedido=arquivo_pedido, ) messages.success(request, "Solicitação de desligamento criada com sucesso.") return redirect("solicitacoes:solicitacao_detalhe", solicitacao_id=solicitacao.id) except Exception: logger.exception("Erro ao criar solicitação de desligamento") messages.error(request, "Não foi possível processar a solicitação. Tente novamente ou contate o suporte.") # Verifica estabilidades para exibir na UI estabilidades = verificar_estabilidades_colaborador(funcionario.id_rm) estabilidades_bloqueantes = [e for e in estabilidades if e.get('bloqueado', False)] desligamento_bloqueado = len(estabilidades_bloqueantes) > 0 return render( request, "solicitacoes/desligamento_form.html", { "funcionario": funcionario, "estabilidades": estabilidades, "estabilidades_bloqueantes": estabilidades_bloqueantes, "desligamento_bloqueado": desligamento_bloqueado, }, ) @login_required @pode_criar_solicitacao def criar_admissao_substituicao(request, pessoa_id): from .intf_sqlserver import listar_cargos_ativos_rm, listar_secoes_ativas_rm funcionario = get_object_or_404(PessoaRM, id=pessoa_id) usuario = get_usuario_sistema(request) if request.method == "POST": data_previsao_str = request.POST.get("data_previsao", "").strip() cod_coligada = request.POST.get("cod_coligada", "").strip() cod_filial = request.POST.get("cod_filial", "").strip() cod_secao = request.POST.get("cod_secao", "").strip() cod_funcao = request.POST.get("cod_funcao", "").strip() justificativa = request.POST.get("justificativa", "").strip() if not data_previsao_str or not cod_coligada or not cod_filial or not cod_secao or not cod_funcao or not justificativa: messages.error(request, "Preencha todos os campos obrigatórios.") else: try: data_previsao = date.fromisoformat(data_previsao_str) except ValueError: messages.error(request, "Data de previsão inválida. Use o formato AAAA-MM-DD.") else: dados = { "data_previsao_contratacao": data_previsao, "cod_coligada_destino": cod_coligada, "cod_filial_destino": cod_filial, "cod_secao_destino": cod_secao, "cod_funcao_destino": cod_funcao, "justificativa": justificativa, } try: solicitacao = services.criar_solicitacao_substituicao( solicitante=usuario, funcionario_substituido=funcionario, dados_admissao=dados, ) messages.success(request, "Solicitação de admissão por substituição criada.") return redirect("solicitacoes:solicitacao_detalhe", solicitacao_id=solicitacao.id) except Exception: logger.exception("Erro ao criar solicitação de admissão por substituição") messages.error(request, "Não foi possível processar a solicitação. Tente novamente ou contate o suporte.") # Busca cargos e seções ativos do RM cargos = listar_cargos_ativos_rm() secoes = listar_secoes_ativas_rm() return render( request, "solicitacoes/admissao_substituicao_form.html", { "funcionario": funcionario, "cargos": cargos, "secoes": secoes, }, ) @login_required @pode_criar_solicitacao def criar_admissao_aumento_quadro(request): from .intf_sqlserver import ( listar_cargos_ativos_rm, listar_secoes_ativas_rm, listar_coligadas_rm, ) usuario = get_usuario_sistema(request) if request.method == "POST": data_previsao_str = request.POST.get("data_previsao", "").strip() cod_coligada = request.POST.get("cod_coligada", "").strip() cod_filial = request.POST.get("cod_filial", "").strip() cod_secao = request.POST.get("cod_secao", "").strip() cod_funcao = request.POST.get("cod_funcao", "").strip() justificativa = request.POST.get("justificativa", "").strip() if not data_previsao_str or not cod_coligada or not cod_filial or not cod_secao or not cod_funcao or not justificativa: messages.error(request, "Preencha todos os campos obrigatórios.") else: try: data_previsao = date.fromisoformat(data_previsao_str) except ValueError: messages.error(request, "Data de previsão inválida. Use o formato AAAA-MM-DD.") else: dados = { "data_previsao_contratacao": data_previsao, "cod_coligada_destino": cod_coligada, "cod_filial_destino": cod_filial, "cod_secao_destino": cod_secao, "cod_funcao_destino": cod_funcao, "justificativa_estrategica": justificativa, } try: solicitacao = services.criar_solicitacao_aumento_quadro( solicitante=usuario, dados_admissao=dados, ) messages.success(request, "Solicitação de aumento de quadro criada.") return redirect("solicitacoes:solicitacao_detalhe", solicitacao_id=solicitacao.id) except Exception: logger.exception("Erro ao criar solicitação de aumento de quadro") messages.error(request, "Não foi possível processar a solicitação. Tente novamente ou contate o suporte.") # Busca cargos, seções e coligadas ativos do RM (filial é informada manualmente, 1–13) cargos = listar_cargos_ativos_rm() secoes = listar_secoes_ativas_rm() coligadas = listar_coligadas_rm() return render( request, "solicitacoes/admissao_aumento_form.html", { "cargos": cargos, "secoes": secoes, "coligadas": coligadas, }, ) @login_required @pode_criar_solicitacao def criar_movimentacao(request, pessoa_id): from .intf_sqlserver import listar_cargos_ativos_rm, listar_secoes_ativas_rm funcionario = get_object_or_404(PessoaRM, id=pessoa_id) usuario = get_usuario_sistema(request) if request.method == "POST": data_efetivacao_str = request.POST.get("data_efetivacao", "").strip() justificativa = request.POST.get("justificativa", "").strip() if not data_efetivacao_str or not justificativa: messages.error(request, "Preencha data de efetivação e justificativa.") else: try: data_efetivacao = date.fromisoformat(data_efetivacao_str) except ValueError: messages.error(request, "Data de efetivação inválida. Use o formato AAAA-MM-DD.") else: dados = { "altera_funcao": bool(request.POST.get("altera_funcao")), "altera_centro_custo": bool(request.POST.get("altera_centro_custo")), "novo_cod_funcao": request.POST.get("novo_cod_funcao"), "novo_cod_secao": request.POST.get("novo_cod_secao"), "novo_salario": request.POST.get("novo_salario") or None, "data_efetivacao": data_efetivacao, "justificativa": justificativa, } try: solicitacao = services.criar_solicitacao_movimentacao( solicitante=usuario, funcionario=funcionario, dados_movimentacao=dados, ) messages.success(request, "Solicitação de movimentação criada.") return redirect("solicitacoes:solicitacao_detalhe", solicitacao_id=solicitacao.id) except Exception: logger.exception("Erro ao criar solicitação de movimentação") messages.error(request, "Não foi possível processar a solicitação. Tente novamente ou contate o suporte.") # Busca cargos e seções ativos do RM cargos = listar_cargos_ativos_rm() secoes = listar_secoes_ativas_rm() return render( request, "solicitacoes/movimentacao_form.html", { "funcionario": funcionario, "cargos": cargos, "secoes": secoes, }, ) @login_required def enviar_solicitacao(request, solicitacao_id): solicitacao = get_object_or_404(Solicitacao, id=solicitacao_id) usuario = get_usuario_sistema(request) try: services.enviar_solicitacao(solicitacao, usuario) messages.success(request, "Solicitação enviada para aprovação.") except Exception: logger.exception("Erro ao enviar solicitação") messages.error(request, "Não foi possível enviar a solicitação. Tente novamente ou contate o suporte.") return redirect("solicitacoes:solicitacao_detalhe", solicitacao_id=solicitacao.id) @login_required @requer_perfil( UsuarioSistema.Perfil.HEAD, UsuarioSistema.Perfil.DIRETORIA, UsuarioSistema.Perfil.ADMIN, ) def decidir_solicitacao(request, solicitacao_id): """ View para HEAD ou DIRETORIA aprovar/reprovar solicitações. HEAD atua quando status é AGUARDANDO_HEAD; DIRETORIA quando é AGUARDANDO_DIRETORIA. """ solicitacao = get_object_or_404(Solicitacao, id=solicitacao_id) usuario = get_usuario_sistema(request) if request.method == "POST": decisao = request.POST.get("decisao", "").strip() justificativa = request.POST.get("justificativa", "").strip() try: if ( solicitacao.status == StatusSolicitacao.AGUARDANDO_HEAD and ( usuario.tem_perfil(UsuarioSistema.Perfil.HEAD) or usuario.tem_perfil(UsuarioSistema.Perfil.ADMIN) ) ): services.aprovar_reprovar_por_head( solicitacao=solicitacao, aprovador=usuario, decisao=decisao, justificativa=justificativa, ) messages.success(request, "Decisão registrada com sucesso.") elif ( solicitacao.status == StatusSolicitacao.AGUARDANDO_DIRETORIA and ( usuario.tem_perfil(UsuarioSistema.Perfil.DIRETORIA) or usuario.tem_perfil(UsuarioSistema.Perfil.ADMIN) ) ): services.aprovar_reprovar_solicitacao( solicitacao=solicitacao, aprovador=usuario, decisao=decisao, justificativa=justificativa, ) messages.success(request, "Decisão registrada com sucesso.") else: messages.error(request, "Solicitação não está aguardando sua decisão.") except Exception: logger.exception("Erro ao registrar decisão da solicitação") messages.error(request, "Não foi possível registrar a decisão. Tente novamente ou contate o suporte.") return redirect("solicitacoes:solicitacao_detalhe", solicitacao_id=solicitacao.id) @login_required @requer_perfil( UsuarioSistema.Perfil.GG, UsuarioSistema.Perfil.CONTROLADORIA, UsuarioSistema.Perfil.ADMIN, ) def registrar_parecer_view(request, solicitacao_id): """ View para GG e CONTROLADORIA registrarem pareceres. Permite anexar arquivos junto com o parecer. """ solicitacao = get_object_or_404(Solicitacao, id=solicitacao_id) usuario = get_usuario_sistema(request) if request.method == "POST": texto = request.POST.get("texto", "").strip() anexo = request.FILES.get("anexo") if not texto: messages.error(request, "O parecer não pode estar vazio.") else: try: services.registrar_parecer( solicitacao=solicitacao, usuario=usuario, texto=texto, anexo=anexo ) messages.success(request, "Parecer registrado com sucesso.") except Exception: logger.exception("Erro ao registrar parecer") messages.error(request, "Não foi possível registrar o parecer. Tente novamente ou contate o suporte.") return redirect("solicitacoes:solicitacao_detalhe", solicitacao_id=solicitacao.id) @login_required def solicitacao_detalhe(request, solicitacao_id): solicitacao = get_object_or_404(Solicitacao, id=solicitacao_id) usuario = get_usuario_sistema(request) # Verifica se o usuário é o solicitante is_solicitante = solicitacao.solicitante.id == usuario.id # Verifica se pode aprovar (apenas pela etapa/perfil, não bloqueia o solicitante) pode_aprovar = solicitacao.pode_aprovar(usuario) # Verifica se pode dar parecer pode_dar_parecer = solicitacao.pode_dar_parecer(usuario) # Busca pareceres existentes pareceres_gg = solicitacao.pareceres.filter(etapa=EtapaAprovacao.GG) pareceres_controladoria = solicitacao.pareceres.filter(etapa=EtapaAprovacao.CONTROLADORIA) # Calcula horas do banco de horas se houver funcionário horas_banco_horas = None if solicitacao.funcionario and solicitacao.funcionario.saldo_banco_horas_minutos is not None: try: horas_banco_horas = float(solicitacao.funcionario.saldo_banco_horas_minutos) / 60.0 except (ValueError, TypeError): horas_banco_horas = None # Busca dados do Winthor se houver funcionário com CPF dados_winthor = None dados_winthor_organizados = None if solicitacao.funcionario and solicitacao.funcionario.cpf: try: dados_winthor = buscar_colaborador_oracle(solicitacao.funcionario.cpf) if dados_winthor: dados_winthor_organizados = { "basicos": { "matricula": dados_winthor.get("matricula"), "nome": dados_winthor.get("nome"), "cpf": dados_winthor.get("cpf"), }, "admissao": { "admissao": dados_winthor.get("admissao"), "situacao": dados_winthor.get("situacao"), "dt_exclusao": dados_winthor.get("dt_exclusao"), }, "endereco": { "endereco": dados_winthor.get("endereco"), "bairro": dados_winthor.get("bairro"), "cidade": dados_winthor.get("cidade"), "estado": dados_winthor.get("estado"), } } except Exception: logger.exception("Erro ao buscar dados do Winthor") return render( request, "solicitacoes/solicitacao_detalhe.html", { "solicitacao": solicitacao, "status_display_viewer": solicitacao.get_status_display_para_usuario(usuario), "is_solicitante": is_solicitante, "pode_aprovar": pode_aprovar, "pode_dar_parecer": pode_dar_parecer, "pareceres_gg": pareceres_gg, "pareceres_controladoria": pareceres_controladoria, "horas_banco_horas": horas_banco_horas, "dados_winthor": dados_winthor, "dados_winthor_organizados": dados_winthor_organizados, }, ) @login_required def solicitacao_comprovante_pdf(request, solicitacao_id): """ Gera um comprovante em PDF (download) a partir do contexto da solicitação. Gera PDF com WeasyPrint a partir do HTML renderizado do comprovante. """ from weasyprint import HTML solicitacao = get_object_or_404(Solicitacao, id=solicitacao_id) pareceres_gg = solicitacao.pareceres.filter(etapa=EtapaAprovacao.GG) pareceres_controladoria = solicitacao.pareceres.filter(etapa=EtapaAprovacao.CONTROLADORIA) horas_banco_horas = None if solicitacao.funcionario and solicitacao.funcionario.saldo_banco_horas_minutos is not None: try: horas_banco_horas = float(solicitacao.funcionario.saldo_banco_horas_minutos) / 60.0 except (ValueError, TypeError): horas_banco_horas = None dados_winthor = None dados_winthor_organizados = None if solicitacao.funcionario and solicitacao.funcionario.cpf: try: dados_winthor = buscar_colaborador_oracle(solicitacao.funcionario.cpf) if dados_winthor: dados_winthor_organizados = { "basicos": { "matricula": dados_winthor.get("matricula"), "nome": dados_winthor.get("nome"), "cpf": dados_winthor.get("cpf"), }, "admissao": { "admissao": dados_winthor.get("admissao"), "situacao": dados_winthor.get("situacao"), "dt_exclusao": dados_winthor.get("dt_exclusao"), }, "endereco": { "endereco": dados_winthor.get("endereco"), "bairro": dados_winthor.get("bairro"), "cidade": dados_winthor.get("cidade"), "estado": dados_winthor.get("estado"), }, } except Exception: logger.exception("Erro ao buscar dados do Winthor no comprovante PDF") context = { "solicitacao": solicitacao, "pareceres_gg": pareceres_gg, "pareceres_controladoria": pareceres_controladoria, "horas_banco_horas": horas_banco_horas, "dados_winthor": dados_winthor, "dados_winthor_organizados": dados_winthor_organizados, "gerado_em": timezone.now(), } html_string = render_to_string("solicitacoes/solicitacao_comprovante_pdf.html", context) short_id = str(solicitacao.id)[:8] filename = f"comprovante-{short_id}.pdf" try: pdf_bytes = HTML(string=html_string, base_url=request.build_absolute_uri()).write_pdf() except Exception: logger.exception("Erro ao gerar PDF do comprovante da solicitação %s", solicitacao.id) return HttpResponse("Erro ao gerar PDF do comprovante.", status=500) response = HttpResponse(pdf_bytes, content_type="application/pdf") response["Content-Disposition"] = f'attachment; filename="{filename}"' return response User = get_user_model() def login_view(request): if request.user.is_authenticated: # CORREÇÃO AQUI: adicionado solicitacoes: return redirect("solicitacoes:dashboard") if request.method == "POST": login_input = request.POST.get("username", "").strip() senha = request.POST.get("password", "").strip() next_get = request.GET.get("next") next_post = request.POST.get("next") # #region agent log _debug_log( "H2", "views.login_view:POST", "login_post_received", { "has_user": bool(login_input), "has_pass": bool(senha), "next_from_get": next_get, "next_from_post": next_post, }, ) # #endregion if not login_input or not senha: messages.error(request, "Informe usuário e senha.") return render(request, "auth/login.html") # Chama a autenticação do Winthor # Espera retorno dict: {'matricula': '123', 'nome': 'Fulano', 'usuariobd': 'FULANO.SILVA'} dados = autenticar_usuario(login_input, senha) if not dados: # #region agent log _debug_log( "H3", "views.login_view:POST", "winthor_auth_failed", {"matricula_len": len(login_input)}, ) # #endregion messages.error(request, "Usuário ou senha inválidos no Winthor.") return render(request, "auth/login.html") # TRUQUE DE INTEGRAÇÃO: # Salvamos o User do Django usando a MATRÍCULA como username. # Isso garante que o get_usuario_sistema funcione corretamente. user, _ = User.objects.get_or_create( username=str(dados["matricula"]), defaults={ "first_name": dados.get("nome", "Usuario").split(" ")[0], }, ) # Loga no Django (sessão) login(request, user) # Atualiza/Cria o UsuarioSistema (Domínio) # Se já existe, mantém o perfil atual. Se é novo, define como GESTOR por padrão usuario_sistema, created = UsuarioSistema.objects.get_or_create( matricula=str(dados["matricula"]), defaults={ "nome": dados["nome"], "ativo": True, "perfil": UsuarioSistema.Perfil.GESTOR, # Default para novos usuários }, ) # Se já existia, apenas atualiza o nome (mantém perfil e status) if not created: usuario_sistema.nome = dados["nome"] usuario_sistema.ativo = True usuario_sistema.save() messages.success(request, f"Bem-vindo, {dados['nome']}!") # O 'next' pega a url que o usuário tentou acessar antes de logar next_url = request.GET.get("next", "solicitacoes:dashboard") # #region agent log _debug_log( "H1", "views.login_view:POST", "before_redirect", { "next_url_resolved": str(next_url)[:500], "next_post_ignored": (request.POST.get("next") or "")[:200], "session_key": getattr(request.session, "session_key", None), "user_auth": request.user.is_authenticated, "user_id": getattr(request.user, "id", None), }, ) # #endregion return redirect(next_url) return render(request, "auth/login.html") @login_required def logout_view(request): logout(request) messages.info(request, "Você saiu do sistema.") return redirect("solicitacoes:login") @login_required def dashboard_view(request): usuario = get_usuario_sistema(request) qs_base = _queryset_dashboard_solicitacoes(usuario) solicitacoes = qs_base.order_by("-criado_em").prefetch_related("pareceres") finalizados = [StatusSolicitacao.FINALIZADA, StatusSolicitacao.REPROVADA] total = qs_base.count() pendentes = qs_base.exclude(status__in=finalizados).count() # Paginação paginator = Paginator(solicitacoes, 10) page = request.GET.get('page') solicitacoes_page = paginator.get_page(page) # Prepara informações sobre quais solicitações podem ser aprovadas ou receber parecer solicitacoes_com_acao = [] for solicitacao in solicitacoes_page: is_solicitante = solicitacao.solicitante.id == usuario.id pode_aprovar = solicitacao.pode_aprovar(usuario) pode_dar_parecer = solicitacao.pode_dar_parecer(usuario) # Busca dados do Winthor se houver funcionário com CPF dados_winthor_organizados = None if solicitacao.funcionario and solicitacao.funcionario.cpf: try: dados_winthor = buscar_colaborador_oracle(solicitacao.funcionario.cpf) if dados_winthor: dados_winthor_organizados = { "basicos": { "matricula": dados_winthor.get("matricula"), "nome": dados_winthor.get("nome"), "cpf": dados_winthor.get("cpf"), }, "admissao": { "admissao": dados_winthor.get("admissao"), "situacao": dados_winthor.get("situacao"), "dt_exclusao": dados_winthor.get("dt_exclusao"), }, "endereco": { "endereco": dados_winthor.get("endereco"), "bairro": dados_winthor.get("bairro"), "cidade": dados_winthor.get("cidade"), "estado": dados_winthor.get("estado"), } } except Exception: # Ignora erros silenciosamente no dashboard pass solicitacoes_com_acao.append({ 'solicitacao': solicitacao, 'status_display_viewer': solicitacao.get_status_display_para_usuario(usuario), 'pode_aprovar': pode_aprovar, 'pode_dar_parecer': pode_dar_parecer, 'is_solicitante': is_solicitante, 'dados_winthor_organizados': dados_winthor_organizados, }) return render(request, "dashboard.html", { "solicitacoes": solicitacoes_page, "solicitacoes_com_acao": solicitacoes_com_acao, "total": total, "pendentes": pendentes, }) @login_required def todas_solicitacoes_view(request): """Listagem de solicitações: Gestor não acessa; Head vê só dos gestores vinculados a ele; GG/Controladoria/Diretoria veem todas.""" usuario = get_usuario_sistema(request) if usuario.eh_apenas_gestor(): return redirect("solicitacoes:dashboard") qs_base, filtro_status, busca = _build_solicitacoes_queryset(request, usuario) total = qs_base.count() # Paginação paginator = Paginator(qs_base, 20) page = request.GET.get("page") solicitacoes_page = paginator.get_page(page) # Contexto de ação por solicitação (pode_aprovar, pode_dar_parecer, etc.) solicitacoes_com_acao = [] for solicitacao in solicitacoes_page: is_solicitante = solicitacao.solicitante.id == usuario.id pode_aprovar = solicitacao.pode_aprovar(usuario) pode_dar_parecer = solicitacao.pode_dar_parecer(usuario) dados_winthor_organizados = None if solicitacao.funcionario and solicitacao.funcionario.cpf: try: dados_winthor = buscar_colaborador_oracle(solicitacao.funcionario.cpf) if dados_winthor: dados_winthor_organizados = { "basicos": { "matricula": dados_winthor.get("matricula"), "nome": dados_winthor.get("nome"), "cpf": dados_winthor.get("cpf"), }, "admissao": { "admissao": dados_winthor.get("admissao"), "situacao": dados_winthor.get("situacao"), "dt_exclusao": dados_winthor.get("dt_exclusao"), }, "endereco": { "endereco": dados_winthor.get("endereco"), "bairro": dados_winthor.get("bairro"), "cidade": dados_winthor.get("cidade"), "estado": dados_winthor.get("estado"), }, } except Exception: pass solicitacoes_com_acao.append({ "solicitacao": solicitacao, "status_display_viewer": solicitacao.get_status_display_para_usuario(usuario), "pode_aprovar": pode_aprovar, "pode_dar_parecer": pode_dar_parecer, "is_solicitante": is_solicitante, "dados_winthor_organizados": dados_winthor_organizados, }) _exp_params = {} if filtro_status: _exp_params["status"] = filtro_status if busca: _exp_params["q"] = busca export_query = ("?" + urlencode(_exp_params)) if _exp_params else "" return render(request, "solicitacoes/todas_solicitacoes.html", { "solicitacoes": solicitacoes_page, "solicitacoes_com_acao": solicitacoes_com_acao, "total": total, "filtro_status": filtro_status, "busca": busca, "export_query": export_query, "status_choices": StatusSolicitacao.choices, }) def _build_solicitacoes_queryset(request, usuario: UsuarioSistema): """Queryset base para listagem/exportação da tela 'Todas as Solicitações'.""" qs_base = Solicitacao.objects.all().order_by("-criado_em") # Head: apenas solicitações dos gestores que ele aprova (subordinados imediatos) if usuario.tem_perfil(UsuarioSistema.Perfil.HEAD) and not usuario.tem_perfil( UsuarioSistema.Perfil.ADMIN ): matriculas = matriculas_gestores_do_head(usuario) if matriculas: qs_base = qs_base.filter(solicitante__matricula__in=matriculas) else: qs_base = qs_base.none() filtro_status = request.GET.get("status", "").strip() if filtro_status: qs_base = qs_base.filter(status=filtro_status) busca = request.GET.get("q", "").strip() if busca: q_filter = ( Q(funcionario__nome__icontains=busca) | Q(solicitante__nome__icontains=busca) | Q(tipo__icontains=busca) ) if len(busca) == 36: try: q_filter |= Q(id=uuid.UUID(busca)) except ValueError: pass qs_base = qs_base.filter(q_filter) return qs_base, filtro_status, busca @login_required def exportar_todas_solicitacoes_xlsx(request): """Exporta todas as solicitações visíveis ao usuário com os filtros atuais.""" usuario = get_usuario_sistema(request) if usuario.eh_apenas_gestor(): return redirect("solicitacoes:dashboard") qs_base, filtro_status, busca = _build_solicitacoes_queryset(request, usuario) wb = Workbook() ws = wb.active ws.title = "Solicitacoes" ws.append( [ "ID", "Tipo", "Colaborador", "Solicitante", "Status", "Criada em", ] ) def excel_text(value): if value is None: return "" return str(value) for solicitacao in qs_base.iterator(): ws.append( [ excel_text(solicitacao.id), excel_text(solicitacao.get_tipo_display()), excel_text( solicitacao.funcionario.nome if solicitacao.funcionario else "" ), excel_text(solicitacao.solicitante.nome), excel_text(solicitacao.get_status_display_para_usuario(usuario)), excel_text( timezone.localtime(solicitacao.criado_em).strftime("%d/%m/%Y %H:%M") ), ] ) output = BytesIO() wb.save(output) output.seek(0) suffix = f"_{filtro_status}" if filtro_status else "" filename = f"solicitacoes_todas{suffix}.xlsx" response = HttpResponse( output.getvalue(), content_type=( "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" ), ) response["Content-Disposition"] = f'attachment; filename="{filename}"' return response @login_required @pode_criar_solicitacao def listar_colaboradores(request): """ Lista colaboradores para seleção ao criar solicitação. Aceita parâmetro 'tipo' na URL: - 'substituicao': busca colaboradores diretamente no RM (todas as situações; admissão por substituição) - outros ou ausente: busca pessoas que não estão desligadas do banco local (padrão) """ from .intf_sqlserver import buscar_colaboradores_rm from .models import PessoaRM # Verifica se é para admissão por substituição tipo = request.GET.get('tipo', '') apenas_desligados = (tipo == 'substituicao') busca = request.GET.get('q', '') # Para admissão por substituição: busca direto no RM (todas as situações) if apenas_desligados: # Busca direto no SQL Server para garantir dados atualizados resultados_rm = buscar_colaboradores_rm(nome=busca if busca else None) # Converte resultados do RM para formato compatível com o template colaboradores_list = [] for row in resultados_rm: id_rm = f"{row['CODCOLIGADA']}-{row['CHAPA']}" # Tenta encontrar no banco local ou sincroniza try: pessoa = PessoaRM.objects.get(id_rm=id_rm) except PessoaRM.DoesNotExist: # Sincroniza a pessoa no banco local com os dados do RM # pymssql retorna chaves em maiúsculas por padrão pessoa, _ = PessoaRM.objects.update_or_create( id_rm=id_rm, defaults={ "matricula": row['CHAPA'], "nome": row['NOME'], "cpf": row.get('CPF'), "cargo": row['FUNCAO'], "setor": row['SECAO'], "centro_custo": row['CODSECAO'], "situacao": row['CODSITUACAO'], "cod_funcao": row.get('CODFUNCAO'), "salario": row.get('SALARIO'), "cod_sindicato": row.get('CODSINDICATO'), } ) colaboradores_list.append(pessoa) # Paginação manual paginator = Paginator(colaboradores_list, 20) page = request.GET.get('page') colaboradores_page = paginator.get_page(page) else: # Padrão: busca pessoas que não estão desligadas do banco local colaboradores = PessoaRM.objects.exclude(situacao='D').order_by('nome') # Busca por nome ou matrícula if busca: colaboradores = colaboradores.filter( nome__icontains=busca ) | colaboradores.filter( matricula__icontains=busca ) # Paginação paginator = Paginator(colaboradores, 20) page = request.GET.get('page') colaboradores_page = paginator.get_page(page) return render(request, "solicitacoes/listar_colaboradores.html", { "colaboradores": colaboradores_page, "busca": busca, "tipo": tipo, "apenas_desligados": apenas_desligados, }) @requer_acesso_gerenciar_permissoes def gerenciar_permissoes(request): """View para gerenciar permissões de usuários. Para perfil HEAD, permite vincular gestores (em relação a quem o Head aprova).""" usuario_atual = get_usuario_sistema(request) # Pré-carrega perfis extras para evitar N+1 todos_usuarios = list( UsuarioSistema.objects.all() .prefetch_related("perfis_extras") .order_by("nome") ) # Gestores são usuários que possuem o perfil GESTOR (principal ou extra) gestores_lista = [u for u in todos_usuarios if u.tem_perfil(UsuarioSistema.Perfil.GESTOR)] usuarios = todos_usuarios # Busca busca = request.GET.get('q', '') if busca: usuarios = usuarios.filter( nome__icontains=busca ) | usuarios.filter( matricula__icontains=busca ) if request.method == "POST": # Salvar gestores vinculados ao Head (formulário "Este Head aprova os gestores:") gestores_head_id = request.POST.get("gestores_head_id") if gestores_head_id: try: head = UsuarioSistema.objects.get(id=gestores_head_id) if not head.tem_perfil(UsuarioSistema.Perfil.HEAD): messages.error(request, "Usuário selecionado não possui perfil de Head.") else: gestores_ids = request.POST.getlist("gestores_ids") HeadGestor.objects.filter(head=head).delete() # Mantém apenas vínculos com usuários que possuem perfil de Gestor (principal ou extra) candidatos = UsuarioSistema.objects.filter(id__in=gestores_ids).prefetch_related("perfis_extras") for gestor in candidatos: if gestor.tem_perfil(UsuarioSistema.Perfil.GESTOR): HeadGestor.objects.get_or_create(head=head, gestor=gestor) messages.success(request, f"Gestores vinculados ao Head {head.nome} atualizados.") except UsuarioSistema.DoesNotExist: messages.error(request, "Head não encontrado.") except Exception: logger.exception("Erro ao salvar vínculos Head-Gestor") messages.error(request, "Não foi possível salvar os vínculos. Tente novamente ou contate o suporte.") return redirect("solicitacoes:gerenciar_permissoes") # Atualização de perfil principal + perfis extras usuario_id = request.POST.get("usuario_id") novo_perfil = request.POST.get("perfil") if usuario_id and novo_perfil: try: usuario_editado = UsuarioSistema.objects.get(id=usuario_id) usuario_editado.perfil = novo_perfil usuario_editado.save() # Perfis extras selecionados no formulário perfis_extras_selecionados = set(request.POST.getlist("perfis_extras")) # Remove eventual duplicidade com o perfil principal perfis_extras_selecionados.discard(novo_perfil) # Sincroniza perfis extras no banco UsuarioPerfilExtra.objects.filter(usuario=usuario_editado).exclude( perfil__in=perfis_extras_selecionados ).delete() for codigo_perfil in perfis_extras_selecionados: UsuarioPerfilExtra.objects.get_or_create( usuario=usuario_editado, perfil=codigo_perfil, ) if usuario_editado.id == usuario_atual.id: messages.success( request, f"Seu perfil foi atualizado para: {usuario_editado.get_perfil_display()}", ) else: messages.success( request, f"Perfil de {usuario_editado.nome} atualizado para: {usuario_editado.get_perfil_display()}", ) except UsuarioSistema.DoesNotExist: messages.error(request, "Usuário não encontrado.") except Exception: logger.exception("Erro ao atualizar perfil") messages.error(request, "Não foi possível atualizar o perfil. Tente novamente ou contate o suporte.") return redirect("solicitacoes:gerenciar_permissoes") # Lista de gestores (para o multi-select / listas dos Heads) gestores = gestores_lista # Paginação paginator = Paginator(usuarios, 20) page = request.GET.get('page') usuarios_page = paginator.get_page(page) # head_id -> lista de gestor_id já vinculados (apenas para usuários da página atual) head_gestores = {} for row in HeadGestor.objects.filter(head__in=list(usuarios_page)).values_list("head_id", "gestor_id"): head_gestores.setdefault(str(row[0]), []).append(str(row[1])) # Para o template: cada item tem usuario, seus perfis extras e lista (gestor, selected) para o multi-select HEAD usuarios_com_gestores = [ { "usuario": u, "perfis_extras": list(u.perfis_extras.values_list("perfil", flat=True)), "gestores_com_selecao": [ (g, str(g.id) in head_gestores.get(str(u.id), [])) for g in gestores ], } for u in usuarios_page ] return render(request, "solicitacoes/gerenciar_permissoes.html", { "usuarios_com_gestores": usuarios_com_gestores, "busca": busca, "perfis": UsuarioSistema.Perfil.choices, "gestores": gestores, })