Browse Source

Adiciona tipos e resolve warnings

migracao
Marcio Mazza 2 years ago
parent
commit
78ff72ae2d
  1. 163
      sapl/legacy/migracao_dados.py

163
sapl/legacy/migracao_dados.py

@ -4,11 +4,13 @@ import os
import re
import subprocess
import traceback
from collections import OrderedDict, defaultdict, namedtuple
from collections import OrderedDict, defaultdict
from dataclasses import dataclass
from datetime import date
from functools import lru_cache, partial
from itertools import groupby
from operator import xor
from typing import Type, Union
import git
import pkg_resources
@ -23,9 +25,8 @@ from django.contrib.auth import get_user_model
from django.contrib.auth.models import Group
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ObjectDoesNotExist
from django.core.management.commands.flush import Command as FlushCommand
from django.db import connections, transaction
from django.db.models import Max, Q
from django.db.models import Field, Max, Model, Q
from pyaml import UnsafePrettyYAMLDumper
from reversion.models import Revision, Version
from sapl.base.models import AppConfig as AppConf
@ -33,7 +34,7 @@ from sapl.base.models import Autor, TipoAutor
from sapl.base.receivers import cria_models_tipo_autor
from sapl.comissoes.models import Comissao, Composicao, Participacao, Reuniao
from sapl.legacy.models import NormaJuridica as OldNormaJuridica
from sapl.legacy.models import Numeracao, TipoNumeracaoProtocolo
from sapl.legacy.models import TipoNumeracaoProtocolo
from sapl.legacy_migration_settings import (
DIR_DADOS_MIGRACAO,
DIR_REPO,
@ -121,17 +122,20 @@ for a1, s1 in name_sets:
# RENAMES ###################################################################
MODEL_RENAME_PATTERN = re.compile("(.+) \((.+)\)")
MODEL_RENAME_PATTERN = re.compile(r"(.+) \((.+)\)")
MODEL_RENAME_INCLUDE_PATTERN = re.compile("<(.+)>")
ModelType = Type[Model]
def get_renames():
field_renames = {}
model_renames = {}
field_renames: dict[ModelType, dict[str, str]] = {}
model_renames: dict[ModelType, str] = {}
includes = {}
for app in appconfs:
app_rename_data = yaml.safe_load(
pkg_resources.resource_string(app.module.__name__, "legacy.yaml")
pkg_resources.resource_string(app.module.__name__, "legacy.yaml") # type: ignore
)
for model_name, renames in app_rename_data.items():
# armazena ou substitui includes
@ -156,13 +160,21 @@ def get_renames():
field_renames, model_renames = get_renames()
legacy_app = apps.get_app_config("legacy")
models_novos_para_antigos = {
models_novos_para_antigos: dict[ModelType, ModelType] = {
model: legacy_app.get_model(model_renames.get(model, model.__name__))
for model in field_renames
}
models_novos_para_antigos[Composicao] = models_novos_para_antigos[Participacao]
campos_novos_para_antigos = {
@dataclass(frozen=True)
class CampoVirtual:
model: ModelType
related_model: ModelType
null = True
campos_novos_para_antigos: dict[Union[Field, CampoVirtual], str] = {
model._meta.get_field(nome_novo): nome_antigo
for model, renames in field_renames.items()
for nome_novo, nome_antigo in renames.items()
@ -177,10 +189,6 @@ for nome_novo, nome_antigo in (
# campos virtuais de Proposicao para funcionar com get_fk_related
class CampoVirtual(namedtuple("CampoVirtual", "model related_model")):
null = True
CAMPOS_VIRTUAIS_PROPOSICAO = {
TipoMateriaLegislativa: CampoVirtual(Proposicao, MateriaLegislativa),
TipoDocumento: CampoVirtual(Proposicao, DocumentoAcessorio),
@ -227,7 +235,7 @@ def warn(tipo, msg, dados):
@lru_cache()
def get_pk_legado(tabela):
def get_pk_legado(tabela: str) -> tuple[str, ...]:
if tabela == "despacho_inicial":
# adaptação para deleção correta no mysql ao final de migrar_model
# acompanha o agrupamento de despacho_inicial feito em iter_sql_records
@ -242,11 +250,11 @@ def get_pk_legado(tabela):
# mas essa parece sempre ser uma chave candidata
return "cod_parlamentar", "cod_sessao_leg", "cod_cargo"
res = exec_legado('show index from {} WHERE Key_name = "PRIMARY"'.format(tabela))
return [r[4] for r in res]
return [r[4] for r in res] # type: ignore
@lru_cache()
def get_estrutura_legado(model):
def get_estrutura_legado(model: ModelType) -> tuple[ModelType, str, tuple[str]]:
model_legado = models_novos_para_antigos[model]
tabela_legado = model_legado._meta.db_table
campos_pk_legado = get_pk_legado(tabela_legado)
@ -1002,7 +1010,7 @@ def criar_configuracao_inicial():
sequencia_numeracao = "U"
else:
sequencia_numeracao = "A"
appconf = AppConf(sequencia_numeracao_protocolo=sequencia_numeracao)
appconf = AppConf(sequencia_numeracao_protocolo=sequencia_numeracao) # type: ignore
appconf.save()
@ -1026,7 +1034,7 @@ def reinicia_sequence(model, ultima_pk_legado):
)
REPO = git.Repo.init(DIR_REPO)
REPO = git.Repo.init(DIR_REPO) # type: ignore
def populate_renamed_fields(new, old):
@ -1058,11 +1066,11 @@ def populate_renamed_fields(new, old):
if field_type == "DateTimeField" and value:
# as datas armazenadas no legado na verdade são naive
sem_tz = value.replace(tzinfo=None)
sem_tz = value.replace(tzinfo=None) # type: ignore
value = PYTZ_TIMEZONE.localize(sem_tz).astimezone(pytz.utc)
if field_type == "TimeField" and value:
value = value.replace(tzinfo=PYTZ_TIMEZONE)
value = value.replace(tzinfo=PYTZ_TIMEZONE) # type: ignore
setattr(new, field.name, value)
@ -1080,6 +1088,8 @@ def get_arquivos_ajustes_pre_migracao():
def do_flush():
# from django.core.management.commands.flush import Command as FlushCommand
#
# excluindo database antigo.
# info("Excluindo entradas antigas do banco destino.")
# FlushCommand().handle(
@ -1154,7 +1164,7 @@ def migrar_dados(primeira_migracao=False, apagar_do_legado=False):
info("Começando migração: ...")
migrar_todos_os_models(apagar_do_legado)
except Exception as e:
ocorrencias["traceback"] = str(traceback.format_exc())
ocorrencias["traceback"] = str(traceback.format_exc()) # type: ignore
raise e
finally:
# congela e grava ocorrências
@ -1220,7 +1230,7 @@ def migrar_model(model, apagar_do_legado):
# A PK NO LEGADO TEM UM ÚNICO CAMPO
nome_pk = model_legado._meta.pk.name
nome_pk = model_legado._meta.pk.name # type: ignore
if "ind_excluido" in {f.name for f in model_legado._meta.fields}:
# se o model legado tem o campo ind_excluido
# enumera apenas os não excluídos
@ -1238,7 +1248,7 @@ def migrar_model(model, apagar_do_legado):
int(v.object_id) for v in Version.objects.get_deleted(model)
]
def ja_esta_migrado(old):
def ja_esta_migrado(old): # type: ignore
id = get_id_do_legado(old)
return id in ids_ja_migrados or id in apagados_pelo_usuario
@ -1250,22 +1260,7 @@ def migrar_model(model, apagar_do_legado):
# A PK NO LEGADO TEM MAIS DE UM CAMPO
old_records = iter_sql_records(tabela_legado)
get_id_do_legado = None
renames = field_renames[model]
campos_velhos_p_novos = {v: k for k, v in renames.items()}
if model_legado == Numeracao:
# nao usamos cod_numeracao no 3.1 => apelamos p todos os campos
campos_chave = [
"cod_materia",
"tip_materia",
"num_materia",
"ano_materia",
"dat_materia",
]
else:
campos_chave = campos_pk_legado
get_id_do_legado = None # type: ignore
# ----------------------------------------------------------------------
# HACK:
@ -1279,6 +1274,22 @@ def migrar_model(model, apagar_do_legado):
# logo podemos desativá-lo
# ----------------------------------------------------------------------
#
# from sapl.legacy.models import Numeracao
# if model_legado == Numeracao:
# # nao usamos cod_numeracao no 3.1 => apelamos p todos os campos
# campos_chave = [
# "cod_materia",
# "tip_materia",
# "num_materia",
# "ano_materia",
# "dat_materia",
# ]
# else:
# campos_chave = campos_pk_legado
#
# renames = field_renames[model]
# campos_velhos_p_novos = {v: k for k, v in renames.items()}
#
# apagados_pelo_usuario = Version.objects.get_deleted(model)
# apagados_pelo_usuario = [
# {k: v for k, v in get_campos_crus_reversion(version).items()}
@ -1409,11 +1420,15 @@ def encontra_conflitos_tipo_autor():
# MIGRATION_ADJUSTMENTS #####################################################
def adjust_acompanhamentomateria(new, old):
def adjust_acompanhamentomateria(
new: AcompanhamentoMateria, old: legacy_models.AcompMateria
):
new.confirmado = True
def adjust_documentoadministrativo(new, old):
def adjust_documentoadministrativo(
new: DocumentoAdministrativo, old: legacy_models.DocumentoAdministrativo
):
if old.num_protocolo:
numero, ano = old.num_protocolo, new.ano
# False < True => o primeiro será o protocolo não anulado
@ -1448,7 +1463,7 @@ Colocamos então o número de protocolo no campo "número externo".
new.observacao += ("\n\n" if new.observacao else "") + nota
def adjust_mandato(new, old):
def adjust_mandato(new: Mandato, old: legacy_models.Mandato):
if old.dat_fim_mandato:
new.data_fim_mandato = old.dat_fim_mandato
if not new.data_fim_mandato:
@ -1460,7 +1475,7 @@ def adjust_mandato(new, old):
new.data_fim_mandato = new.legislatura.data_fim
def adjust_ordemdia_antes_salvar(new, old):
def adjust_ordemdia_antes_salvar(new: OrdemDia, old: legacy_models.OrdemDia):
new.votacao_aberta = False
if not old.tip_votacao:
@ -1476,7 +1491,7 @@ def adjust_ordemdia_antes_salvar(new, old):
)
def adjust_parlamentar(new, old):
def adjust_parlamentar(new: Parlamentar, old: legacy_models.Parlamentar):
if old.ind_unid_deliberativa:
value = new.unidade_deliberativa
# Field is defined as not null in legacy db,
@ -1505,7 +1520,7 @@ def adjust_parlamentar(new, old):
new.municipio_residencia, new.uf_residencia = municipio_uf[0]
def adjust_participacao(new, old):
def adjust_participacao(new: Participacao, old: legacy_models.ComposicaoComissao):
comissao_id, periodo_id = [
get_fk_related(Composicao._meta.get_field(name), old)
for name in ("comissao", "periodo")
@ -1518,11 +1533,13 @@ def adjust_participacao(new, old):
new.composicao = composicao
def adjust_normarelacionada(new, old):
def adjust_normarelacionada(
new: NormaRelacionada, old: legacy_models.VinculoNormaJuridica
):
new.tipo_vinculo = TipoVinculoNormaJuridica.objects.get(sigla=old.tip_vinculo)
def adjust_protocolo_antes_salvar(new, old):
def adjust_protocolo_antes_salvar(new: Protocolo, old: legacy_models.Protocolo):
if new.numero is None:
new.numero = old.cod_protocolo
warn(
@ -1548,7 +1565,9 @@ def get_como_resolver_registro_votacao_ambiguo():
return {}
def adjust_registrovotacao_antes_salvar(new, old):
def adjust_registrovotacao_antes_salvar(
new: RegistroVotacao, old: legacy_models.RegistroVotacao
):
ordem_dia = OrdemDia.objects.filter(pk=old.cod_ordem, materia=old.cod_materia)
expediente_materia = ExpedienteMateria.objects.filter(
pk=old.cod_ordem, materia=old.cod_materia
@ -1561,7 +1580,7 @@ def adjust_registrovotacao_antes_salvar(new, old):
# registro de votação ambíguo
if ordem_dia and expediente_materia:
como_resolver = get_como_resolver_registro_votacao_ambiguo()
campo = como_resolver[new.id]
campo = como_resolver[new.pk]
if campo.startswith("ordem"):
new.ordem = ordem_dia[0]
elif campo.startswith("expediente"):
@ -1571,12 +1590,12 @@ def adjust_registrovotacao_antes_salvar(new, old):
"""
Registro de Votação ambíguo: {}
Resolva criando o arquivo {}""".format(
new.id, get_arquivo_resolve_registro_votacao()
new.pk, get_arquivo_resolve_registro_votacao()
)
)
def adjust_tipoafastamento(new, old):
def adjust_tipoafastamento(new: TipoAfastamento, old: legacy_models.TipoAfastamento):
assert xor(old.ind_afastamento, old.ind_fim_mandato)
if old.ind_afastamento:
new.indicador = "A"
@ -1592,14 +1611,14 @@ def set_generic_fk(new, campo_virtual, old):
new.object_id = get_fk_related(campo_virtual, old)
def adjust_tipoproposicao(new, old):
def adjust_tipoproposicao(new: TipoProposicao, old: legacy_models.TipoProposicao):
"Aponta para o tipo relacionado de matéria ou documento"
if old.tip_mat_ou_doc is not None:
campo_virtual = CAMPOS_VIRTUAIS_TIPO_PROPOSICAO[old.ind_mat_ou_doc]
set_generic_fk(new, campo_virtual, old)
def adjust_proposicao_antes_salvar(new, old):
def adjust_proposicao_antes_salvar(new: Proposicao, old: legacy_models.Proposicao):
if new.data_envio:
new.ano = new.data_envio.year
if old.cod_mat_ou_doc is not None:
@ -1608,7 +1627,7 @@ def adjust_proposicao_antes_salvar(new, old):
set_generic_fk(new, campo_virtual, old)
def adjust_statustramitacao(new, old):
def adjust_statustramitacao(new: StatusTramitacao, old: legacy_models.StatusTramitacao):
if old.ind_fim_tramitacao:
new.indicador = "F"
elif old.ind_retorno_tramitacao:
@ -1617,16 +1636,20 @@ def adjust_statustramitacao(new, old):
new.indicador = ""
def adjust_statustramitacaoadm(new, old):
adjust_statustramitacao(new, old)
def adjust_statustramitacaoadm(
new: StatusTramitacaoAdministrativo,
old: legacy_models.StatusTramitacaoAdministrativo,
):
# tipagem: StatusTramitacaoAdministrativo é uma cópia de StatusTramitacao
adjust_statustramitacao(new, old) # type: ignore
def adjust_tramitacao(new, old):
def adjust_tramitacao(new: Tramitacao, old: legacy_models.Tramitacao):
if old.sgl_turno == "Ú":
new.turno = "U"
def adjust_tipo_autor(new, old):
def adjust_tipo_autor(new: TipoAutor, old: legacy_models.TipoAutor):
model_apontado = normalize(new.descricao.lower()).replace(" ", "")
content_types = ContentType.objects.filter(model=model_apontado).exclude(
app_label="legacy"
@ -1635,7 +1658,9 @@ def adjust_tipo_autor(new, old):
new.content_type = content_types[0] if content_types else None
def adjust_normajuridica_antes_salvar(new, old):
def adjust_normajuridica_antes_salvar(
new: NormaJuridica, old: legacy_models.NormaJuridica
):
# Ajusta choice de esfera_federacao
# O 'S' vem de 'Selecionar'. Na versão antiga do SAPL, quando uma opção do
# combobox era selecionada, o sistema pegava a primeira letra da seleção,
@ -1716,7 +1741,7 @@ def adjust_autor(new: Autor, old: legacy_models.Autor):
new.operadores.add(user)
def adjust_comissao(new, old):
def adjust_comissao(new: Comissao, old: legacy_models.Comissao):
if not old.dat_extincao and not old.dat_fim_comissao:
new.ativa = True
elif (
@ -1730,7 +1755,9 @@ def adjust_comissao(new, old):
new.ativa = False
def adjust_tiporesultadovotacao(new, old):
def adjust_tiporesultadovotacao(
new: TipoResultadoVotacao, old: legacy_models.TipoResultadoVotacao
):
if "aprova" in new.nome.lower():
new.natureza = TipoResultadoVotacao.NATUREZA_CHOICES.aprovado
elif "rejeita" in new.nome.lower():
@ -1755,7 +1782,7 @@ def str_to_time(fonte):
return tempo.time() if tempo else None
def adjust_reuniao_comissao(new, old):
def adjust_reuniao_comissao(new: Reuniao, old: legacy_models.ReuniaoComissao):
new.hora_inicio = str_to_time(old.hr_inicio_reuniao)
@ -1776,7 +1803,7 @@ def get_mesa_diretora(cod_sessao_leg):
return mesa
def adjust_composicao_mesa(new: ComposicaoMesa, old):
def adjust_composicao_mesa(new: ComposicaoMesa, old: legacy_models.ComposicaoMesa):
new.mesa_diretora = get_mesa_diretora(old.cod_sessao_leg)
@ -1787,11 +1814,13 @@ def remove_style(conteudo):
soup = BeautifulSoup(conteudo, "html.parser")
for tag in soup.recursiveChildGenerator():
if hasattr(tag, "attrs"):
tag.attrs = {k: v for k, v in tag.attrs.items() if k != "style"}
tag.attrs = {k: v for k, v in tag.attrs.items() if k != "style"} # type: ignore
return str(soup)
def adjust_expediente_sessao(new, old):
def adjust_expediente_sessao(
new: ExpedienteSessao, old: legacy_models.ExpedienteSessaoPlenaria
):
new.conteudo = remove_style(new.conteudo)
@ -1898,7 +1927,7 @@ def gravar_marco(
if versiona:
REPO.git.add([dir_dados.name])
if gera_backup:
REPO.git.add([arq_backup.name])
REPO.git.add([arq_backup.name]) # type: ignore
if "master" not in REPO.heads or REPO.index.diff("HEAD"):
# se de fato existe mudança
REPO.index.commit(f"Grava marco (em {nome_dir})")

Loading…
Cancel
Save