Browse Source

CSV/xlsx/json refactoring

pull/3786/head
Edward Ribeiro 2 months ago
parent
commit
450c3ace3b
  1. 1
      sapl/base/apps.py
  2. 23
      sapl/materia/views.py
  3. 7
      sapl/norma/views.py
  4. 8
      sapl/protocoloadm/views.py
  5. 42
      sapl/sessao/views.py
  6. 441
      sapl/utils.py

1
sapl/base/apps.py

@ -1,4 +1,3 @@
import django
from django.utils.translation import ugettext_lazy as _

23
sapl/materia/views.py

@ -2045,22 +2045,29 @@ class MateriaLegislativaPesquisaView(MultiFormatOutputMixin, FilterView):
filterset_class = MateriaLegislativaFilterSet
paginate_by = 50
fields_base_report = [
'id', 'ano', 'numero', 'tipo__sigla', 'tipo__descricao', 'autoria__autor__nome', 'texto_original', 'ementa'
export_fields = [
'id', 'ano', 'numero', 'tipo__sigla', 'tipo__descricao', 'autoria', 'texto_original', 'ementa'
]
fields_report = {
'csv': fields_base_report,
'xlsx': fields_base_report,
'json': fields_base_report,
}
def hook_texto_original(self, obj):
url = self.request.build_absolute_uri('/')[:-1]
texto_original = obj.texto_original if not isinstance(
obj, dict) else obj["texto_original"]
return f'{url}/media/{texto_original}'
def hook_autoria(self, obj):
"""
Hook específico para pegar nomes dos autores (reverse query)
"""
try:
autores = [
str(autoria.autor.nome)
for autoria in obj.autoria_set.select_related('autor').all()
]
return ', '.join(autores)
except AttributeError:
return ''
def get_filterset_kwargs(self, filterset_class):
super().get_filterset_kwargs(filterset_class)

7
sapl/norma/views.py

@ -152,14 +152,9 @@ class NormaPesquisaView(MultiFormatOutputMixin, FilterView):
filterset_class = NormaFilterSet
paginate_by = 50
fields_base_report = [
export_fields = [
'id', 'ano', 'numero', 'tipo__sigla', 'tipo__descricao', 'texto_integral', 'ementa'
]
fields_report = {
'csv': fields_base_report,
'xlsx': fields_base_report,
'json': fields_base_report,
}
def hook_texto_integral(self, obj):
url = self.request.build_absolute_uri('/')[:-1]

8
sapl/protocoloadm/views.py

@ -992,7 +992,6 @@ class ProtocoloMateriaView(PermissionRequiredMixin, CreateView):
def get_context_data(self, **kwargs):
context = super(CreateView, self).get_context_data(**kwargs)
autores_ativos = self.autores_ativos()
autores = []
autores.append(['0', '------'])
for a in autores_ativos:
@ -1042,14 +1041,9 @@ class PesquisarDocumentoAdministrativoView(DocumentoAdministrativoMixin,
paginate_by = 10
permission_required = ('protocoloadm.list_documentoadministrativo', )
fields_base_report = [
export_fields = [
'id', 'ano', 'numero', 'tipo__sigla', 'tipo__descricao', 'assunto'
]
fields_report = {
'csv': fields_base_report,
'xlsx': fields_base_report,
'json': fields_base_report,
}
def get_filterset_kwargs(self, filterset_class):
super(PesquisarDocumentoAdministrativoView,

42
sapl/sessao/views.py

@ -3813,22 +3813,23 @@ class PautaSessaoDetailView(PautaMultiFormatOutputMixin, DetailView):
template_name = "sessao/pauta_sessao_detail.html"
model = SessaoPlenaria
queryset_values_for_formats = False
export_fields = (
('id', 'ID'),
('periodo', 'Período'),
('titulo', 'Matéria'),
('autor', 'Autor'),
('ementa', 'Ementa'),
('situacao', 'Situação')
)
fields_base_report = [
[('id', 'ID'), ('titulo', 'Matéria'), ('autor', 'Autor'), ('ementa', 'Ementa'), ('situacao', 'Situação')],
[('id', 'ID'), ('titulo', 'Matéria'), ('autor', 'Autor'), ('ementa', 'Ementa'), ('situacao', 'Situação')]
]
fields_report = {
'csv': fields_base_report,
'xlsx': fields_base_report,
'json': fields_base_report,
}
def hook_autor(self, obj):
return ','.join(obj['autor'])
def hook_titulo(self, obj):
return str(obj['titulo'])
item_context = [
('materia_expediente', 'Matérias do Expediente'),
('materias_ordem', 'Matérias da Ordem do Dia')
]
def hook_situacao(self, obj):
return str(obj['situacao'])
def get(self, request, *args, **kwargs):
from sapl.relatorios.views import relatorio_pauta_sessao_weasy # Evitar import ciclico
@ -3888,7 +3889,8 @@ class PautaSessaoDetailView(PautaMultiFormatOutputMixin, DetailView):
'situacao': ultima_tramitacao.status if ultima_tramitacao else _("Não informada"),
'processo': f'{str(numeracao.numero_materia)}/{str(numeracao.ano_materia)}' if numeracao else '-',
'autor': [str(x.autor) for x in m.materia.autoria_set.select_related('autor').all()],
'turno': get_turno(ultima_tramitacao.turno) if ultima_tramitacao else ''
'turno': get_turno(ultima_tramitacao.turno) if ultima_tramitacao else '',
'periodo': 'expediente',
})
context.update({'materia_expediente': materias_expediente})
@ -3972,7 +3974,8 @@ class PautaSessaoDetailView(PautaMultiFormatOutputMixin, DetailView):
'situacao': ultima_tramitacao.status if ultima_tramitacao else _("Não informada"),
'processo': f'{str(numeracao.numero_materia)}/{str(numeracao.ano_materia)}' if numeracao else '-',
'autor': [str(x.autor) for x in Autoria.objects.select_related("autor").filter(materia_id=o.materia_id)],
'turno': get_turno(ultima_tramitacao.turno) if ultima_tramitacao else ''
'turno': get_turno(ultima_tramitacao.turno) if ultima_tramitacao else '',
'periodo': 'ordem dia',
})
context.update({
@ -3998,14 +4001,9 @@ class PesquisarSessaoPlenariaView(MultiFormatOutputMixin, FilterView):
queryset_values_for_formats = False
fields_base_report = [
export_fields = [
'id', 'data_inicio', 'hora_inicio', 'data_fim', 'hora_fim', '',
]
fields_report = {
'csv': fields_base_report,
'xlsx': fields_base_report,
'json': fields_base_report,
}
def get_filterset_kwargs(self, filterset_class):
super().get_filterset_kwargs(filterset_class)

441
sapl/utils.py

@ -78,7 +78,6 @@ def is_weak_password(password):
return len(password) < MIN_PASSWORD_LENGTH or not (pwd_has_lowercase and pwd_has_uppercase
and pwd_has_number and pwd_has_special_char)
def groups_remove_user(user, groups_name):
from django.contrib.auth.models import Group
@ -1169,12 +1168,12 @@ def from_date_to_datetime_utc(data):
class OverwriteStorage(FileSystemStorage):
'''
"""
Solução derivada do gist: https://gist.github.com/fabiomontefuscolo/1584462
Muda o comportamento padrão do Django e o faz sobrescrever arquivos de
mesmo nome que foram carregados pelo usuário ao invés de renomeá-los.
'''
"""
def get_available_name(self, name, max_length=None):
if self.exists(name):
@ -1331,13 +1330,62 @@ def get_path_to_name_report_map():
}
class Row:
def __init__(self, cols, is_header = False):
self.cols = cols
self.is_header = is_header
def __repr__(self):
return f"Row(columns={self.cols}, is_header={self.is_header})"
class Table:
def __init__(self, header = [], rows = []):
self.header = header
self.rows = rows
def add_header(self, header):
if header.is_header:
self.header = header
else:
raise Exception(f"Row {header} is not header!")
def append(self, row):
if not row.is_header:
self.rows.append(row)
else:
raise Exception(f"Row {row} is header!")
def to_list(self):
return [self.header.cols] + [r.cols for r in self.rows]
def size(self):
return len(self.rows)
def __repr__(self):
return f"Table(Header={self.header}, Rows={self.rows})"
class MultiFormatOutputMixin:
formats_impl = 'csv', 'xlsx', 'json'
formats_impl = ['csv', 'xlsx', 'json']
queryset_values_for_formats = True
def render_to_response(self, context, **response_kwargs):
export_fields = ()
fields_by_format = None
def get_export_fields(self):
names = getattr(self, "export_fields", []) or []
return names
def get_fields_by_format(self):
provided = getattr(self, "fields_by_format", None)
if provided:
return {fmt: v for fmt, v in provided.items()}
fields = self.get_export_fields()
return {fmt: fields for fmt in self.formats_impl}
def render_to_response(self, context, **response_kwargs):
format_result = getattr(self.request, self.request.method).get(
'format', None)
@ -1356,183 +1404,157 @@ class MultiFormatOutputMixin:
return super().render_to_response(context, **response_kwargs)
def render_to_json(self, context):
object_list = context['object_list']
if self.queryset_values_for_formats:
object_list = object_list.values(
*self.fields_report['json'])
data = []
for obj in object_list:
wr = list(self._write_row(obj, self.fields_report['json']))
if not data:
data.append([wr])
continue
if wr[0] != data[-1][0][0]:
data.append([wr])
else:
data[-1].append(wr)
for mri, multirows in enumerate(data):
if len(multirows) == 1:
v = multirows[0]
else:
v = multirows[0]
for ri, cols in enumerate(multirows[1:]):
for rc, cell in enumerate(cols):
if v[rc] != cell:
v[rc] = f'{v[rc]}\r\n{cell}'
fmt_map = self.get_fields_by_format()
export_fields = fmt_map['json']
rows = [Row(cols=self._extract_row(obj, export_fields)) for obj in object_list]
table = Table(header=Row(export_fields, is_header=True), rows=rows)
data[mri] = dict(
map(lambda i, j: (i, j), self.fields_report['json'], v))
headers = self._headers(export_fields)
if isinstance(export_fields, tuple):
export_fields = [f[0] for f in export_fields]
json_metadata = {
'headers': dict(
map(lambda i, j: (i, j), self.fields_report['json'], self._headers(self.fields_report['json']))),
'results': data
'headers': dict(zip(export_fields, headers)),
'results': [dict(zip(export_fields, row.cols)) for row in table.rows],
}
response = JsonResponse(json_metadata)
response['Content-Disposition'] = f'attachment; filename="sapl_{self.request.resolver_match.url_name}.json"'
response['Cache-Control'] = 'no-cache'
response['Pragma'] = 'no-cache'
response['Expires'] = 0
response = self._set_response_params(JsonResponse(json_metadata), 'json')
return response
def render_to_csv(self, context):
response = HttpResponse(content_type='text/csv')
response['Content-Disposition'] = f'attachment; filename="sapl_{self.request.resolver_match.url_name}.csv"'
response['Cache-Control'] = 'no-cache'
response['Pragma'] = 'no-cache'
response['Expires'] = 0
writer = csv.writer(response, delimiter=";",
quoting=csv.QUOTE_NONNUMERIC)
object_list = context['object_list']
if self.queryset_values_for_formats:
object_list = object_list.values(
*self.fields_report['csv'])
data = [[list(self._headers(self.fields_report['csv']))], ]
for obj in object_list:
wr = list(self._write_row(obj, self.fields_report['csv']))
if wr[0] != data[-1][0][0]:
data.append([wr])
else:
data[-1].append(wr)
for mri, multirows in enumerate(data):
if len(multirows) == 1:
writer.writerow(multirows[0])
else:
v = multirows[0]
for ri, cols in enumerate(multirows[1:]):
for rc, cell in enumerate(cols):
if v[rc] != cell:
v[rc] = f'{v[rc]}\r\n{cell}'
writer.writerow(v)
fmt_map = self.get_fields_by_format()
export_fields = fmt_map['csv']
rows = [Row(self._extract_row(obj, export_fields)) for obj in object_list]
table = Table(header=Row(self._headers(export_fields), is_header=True), rows=rows)
response = self._set_response_params(HttpResponse(content_type='text/csv'), 'csv')
writer = csv.writer(response,
delimiter=";",
quoting=csv.QUOTE_NONNUMERIC)
writer.writerows(table.to_list())
return response
def render_to_xlsx(self, context):
object_list = context['object_list']
if self.queryset_values_for_formats:
object_list = object_list.values(
*self.fields_report['xlsx'])
data = [[list(self._headers(self.fields_report['xlsx']))], ]
for obj in object_list:
wr = list(self._write_row(obj, self.fields_report['xlsx']))
if wr[0] != data[-1][0][0]:
data.append([wr])
else:
data[-1].append(wr)
fmt_map = self.get_fields_by_format()
field_names = fmt_map['xlsx']
rows = [Row(self._extract_row(obj, field_names)) for obj in object_list]
table = Table(header=Row(self._headers(field_names), is_header=True), rows=rows)
output = io.BytesIO()
wb = Workbook(output, {'in_memory': True})
ws = wb.add_worksheet()
for mri, multirows in enumerate(data):
if len(multirows) == 1:
for rc, cell in enumerate(multirows[0]):
ws.write(mri, rc, cell)
else:
v = multirows[0]
for ri, cols in enumerate(multirows[1:]):
for rc, cell in enumerate(cols):
if v[rc] != cell:
v[rc] = f'{v[rc]}\r\n{cell}'
for rc, cell in enumerate(v):
ws.write(mri, rc, cell)
for row_idx, row in enumerate(table.to_list()):
for cell_idx, cell in enumerate(row):
ws.write(row_idx, cell_idx, cell)
ws.autofit()
wb.close()
output.seek(0)
response = HttpResponse(output.read(
), content_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
response['Content-Disposition'] = f'attachment; filename="sapl_{self.request.resolver_match.url_name}.xlsx"'
response['Cache-Control'] = 'no-cache'
response['Pragma'] = 'no-cache'
response['Expires'] = 0
response = self._set_response_params(
HttpResponse(output.read(),
content_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"),
'xlsx'
)
output.close()
return response
def _write_row(self, obj, fields_report):
for fname in fields_report:
def _extract_row(self, obj, columns):
"""Versão mais simples que não chama métodos"""
for col_name in columns:
if isinstance(col_name, tuple):
col_name = col_name[0]
if type(fname) is tuple:
fname = fname[0]
if hasattr(self, f'hook_{fname}'):
v = getattr(self, f'hook_{fname}')(obj)
yield v
# Hook personalizado
if hasattr(self, f'hook_{col_name}'):
value = getattr(self, f'hook_{col_name}')(obj)
yield value
continue
# Dicionário
if isinstance(obj, dict):
yield obj[fname]
yield obj.get(col_name, '')
continue
fname = fname.split('__')
v = obj
for fp in fname:
v = getattr(v, fp)
# Navegação SEM chamar callables
try:
field_parts = col_name.split('__')
value = obj
for part in field_parts:
if value is None:
value = ''
break
value = getattr(value, part)
# NÃO chamamos callables - apenas pegamos o atributo
# Tratamento para relacionamentos
if hasattr(value, 'all'):
items = value.all()
value = ' - '.join(str(item) for item in items) if items.exists() else ''
yield str(value) if value is not None else ''
except AttributeError:
# Variações para relacionamentos reversos
base_field = col_name.split('__')[0]
variations = [f'{base_field}_set', f'{base_field}s']
found = False
for variation in variations:
try:
new_field = col_name.replace(base_field, variation, 1)
field_parts = new_field.split('__')
value = obj
for part in field_parts:
value = getattr(value, part)
if hasattr(value, 'all'):
items = value.all()
value = ' - '.join(str(item) for item in items) if items.exists() else ''
yield str(value) if value is not None else ''
found = True
break
except AttributeError:
continue
if hasattr(v, 'all'):
v = ' - '.join(map(lambda x: str(x), v.all()))
if not found:
yield ''
yield v
def _set_response_params(self, response, extension):
response[
'Content-Disposition'] = f'attachment; filename="sapl_{self.request.resolver_match.url_name}.{extension}"'
response['Cache-Control'] = 'no-cache'
response['Pragma'] = 'no-cache'
response['Expires'] = 0
return response
def _headers(self, fields_report):
def _headers(self, field_names):
for fname in fields_report:
for fname in field_names:
verbose_name = []
if hasattr(self, f'hook_header_{fname}'):
if hasattr(self, f'hook_header_{fname}'): # suporta extensao de funcionalidade
h = getattr(self, f'hook_header_{fname}')()
yield h
continue
if type(fname) is tuple:
verbose_name.append(fname[1])
verbose_name.append(fname[1]) # suporta (field_name, alias)
else:
fname = fname.split('__')
# nem sempre isso vai funcionar, pois o model base
# pode ser diferente. Exemplo: pauta usa SessaoPlenaria,
# mas retornamos campos de MateriaLegislativa
m = self.model
for fp in fname:
@ -1554,156 +1576,17 @@ class MultiFormatOutputMixin:
class PautaMultiFormatOutputMixin(MultiFormatOutputMixin):
def render_to_csv(self, context):
response = HttpResponse(content_type='text/csv')
response['Content-Disposition'] = f'attachment; filename="sapl_{self.request.resolver_match.url_name}.csv"'
response['Cache-Control'] = 'no-cache'
response['Pragma'] = 'no-cache'
response['Expires'] = 0
writer = csv.writer(response, delimiter=";",
quoting=csv.QUOTE_NONNUMERIC)
writer.writerow(['Pauta da ' + str(context['sessaoplenaria'])])
writer.writerow('')
for item in self.item_context:
if item[0] in context:
index = self.item_context.index(item)
writer.writerow([self.item_context[index][1]])
data = [[list(self._headers(self.fields_report['csv'][index]))], ]
for obj in context.get(item[0]):
wr = list(self._write_row(obj, self.fields_report['csv'][index]))
if wr[0] != data[-1][0][0]:
data.append([wr])
else:
data[-1].append(wr)
for mri, multirows in enumerate(data):
if len(multirows) == 1:
writer.writerow(multirows[0])
else:
v = multirows[0]
for ri, cols in enumerate(multirows[1:]):
for rc, cell in enumerate(cols):
if v[rc] != cell:
v[rc] = f'{v[rc]}\r\n{cell}'
writer.writerow(v)
writer.writerow('')
def __mutate_context(self, context):
context_materias = {
'object_list': context.get('materias_ordem', []) + context.get('materia_expediente', [])
}
return context_materias
return response
def render_to_csv(self, context):
return super().render_to_csv(self.__mutate_context(context))
def render_to_json(self, context):
json_metadata = {'sessaoplenaria': str(context['sessaoplenaria'])}
for item in self.item_context:
if item[0] in context:
index = self.item_context.index(item)
json_metadata.update({item[0]: {}})
data = []
for obj in context.get(item[0]):
wr = list(self._write_row(obj, self.fields_report['json'][index]))
if not data:
data.append([wr])
continue
if wr[0] != data[-1][0][0]:
data.append([wr])
else:
data[-1].append(wr)
for mri, multirows in enumerate(data):
if len(multirows) == 1:
try:
v = multirows[0]
except TypeError:
v = str(multirows[0])
else:
try:
v = str(multirows[0])
except TypeError:
v = multirows[0]
for ri, cols in enumerate(multirows[1:]):
for rc, cell in enumerate(cols):
if v[rc] != cell:
v[rc] = f'{v[rc]}\r\n{cell}'
data[mri] = dict(
map(lambda i, j: (i[0], j if type(j) in [str, int, list] else str(j)),
self.fields_report['json'][index], v))
json_metadata.update({item[0]: {
'headers': dict(
map(lambda i, j: (i[0], j), self.fields_report['json'][index],
self._headers(self.fields_report['json'][index]))),
'results': data}
})
response = JsonResponse(json_metadata)
response['Content-Disposition'] = f'attachment; filename="sapl_{self.request.resolver_match.url_name}.json"'
response['Cache-Control'] = 'no-cache'
response['Pragma'] = 'no-cache'
response['Expires'] = 0
return response
return super().render_to_json(self.__mutate_context(context))
def render_to_xlsx(self, context):
output = io.BytesIO()
wb = Workbook(output, {'in_memory': True})
ws = wb.add_worksheet()
ws.write('A1', 'Pauta da ' + str(context['sessaoplenaria']))
row = 2
for item in self.item_context:
if item[0] in context:
index = self.item_context.index(item)
ws.write(row, 0, self.item_context[index][1])
row += 1
data = [[list(self._headers(self.fields_report['xlsx'][index]))], ]
for obj in context.get(item[0]):
wr = list(self._write_row(obj, self.fields_report['xlsx'][index]))
if wr[0] != data[-1][0][0]:
data.append([wr])
else:
data[-1].append(wr)
for mri, multirows in enumerate(data):
if len(multirows) == 1:
for rc, cell in enumerate(multirows[0]):
try:
ws.write(row, rc, cell)
except TypeError:
ws.write(row, rc, str(cell))
row += 1
else:
v = multirows[0]
for ri, cols in enumerate(multirows[1:]):
for rc, cell in enumerate(cols):
if v[rc] != cell:
v[rc] = f'{v[rc]}\r\n{cell}'
for rc, cell in enumerate(v):
ws.write(row, rc, cell)
row += 1
row += 1
ws.autofit()
wb.close()
output.seek(0)
response = HttpResponse(output.read(
), content_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
response['Content-Disposition'] = f'attachment; filename="sapl_{self.request.resolver_match.url_name}.xlsx"'
response['Cache-Control'] = 'no-cache'
response['Pragma'] = 'no-cache'
response['Expires'] = 0
output.close()
return response
return super().render_to_xlsx(self.__mutate_context(context))

Loading…
Cancel
Save