Sistema de Apoio ao Processo Legislativo
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

412 lines
14 KiB

from collections import OrderedDict
import importlib
import inspect
import logging
import re
from django.apps.config import AppConfig
from django.apps.registry import apps
from django.conf import settings
from django.db.models.base import ModelBase
from django.db.models.fields import TextField, CharField
from django.db.models.fields.files import FileField
from django.db.models.fields.json import JSONField
from django.template.defaultfilters import capfirst
from django.utils.translation import gettext_lazy as _
import django_filters
from django_filters.constants import ALL_FIELDS, EMPTY_VALUES
from django_filters.filters import CharFilter
from django_filters.filterset import FilterSet
from django_filters.rest_framework.backends import DjangoFilterBackend
from django_filters.utils import resolve_field, get_all_model_fields
from rest_framework import serializers as rest_serializers
from rest_framework.response import Response
from rest_framework.routers import DefaultRouter
from rest_framework.viewsets import ModelViewSet
logger = logging.getLogger(__name__)
class SplitStringCharFilter(django_filters.CharFilter):
_re = re.compile(r'("[^"]+"| +|[^"]+)')
def filter(self, qs, value):
if value in EMPTY_VALUES:
return qs
if self.distinct:
qs = qs.distinct()
lookup = '%s__%s' % (self.field_name, self.lookup_expr)
values = [value]
if self.lookup_expr == 'icontains':
if not '"' in value:
values = value.split(' ')
else:
values = list(
filter(
lambda x: x and x != ' ' and x[0] != '"',
self._re.findall(value)
)
) + list(
map(
lambda x: x[1:-1],
filter(
lambda x: x and x[0] == '"',
self._re.findall(value)
)
)
)
if not isinstance(values, list):
values = [values]
for v in values:
qs = self.get_method(qs)(**{lookup: v})
return qs
class ApiFilterSetMixin(FilterSet):
o = CharFilter(method='filter_o')
class Meta:
fields = '__all__'
filter_overrides = {
FileField: {
'filter_class': django_filters.CharFilter,
'extra': lambda f: {
'lookup_expr': 'exact',
},
},
CharField: {
'filter_class': SplitStringCharFilter,
},
TextField: {
'filter_class': SplitStringCharFilter,
},
JSONField: {
'filter_class': django_filters.CharFilter,
'extra': lambda f: {
'lookup_expr': 'exact',
},
},
}
def filter_o(self, queryset, name, value):
try:
return queryset.order_by(
*map(str.strip, value.split(',')))
except:
return queryset
@classmethod
def get_fields(cls):
model = cls._meta.model
fields_model = get_all_model_fields(model)
fields_filter = cls._meta.fields
exclude = cls._meta.exclude
if exclude is not None and fields_filter is None:
fields_filter = ALL_FIELDS
fields = fields_filter if isinstance(fields_filter, dict) else {}
for f_str in fields_model:
if f_str not in fields:
f = model._meta.get_field(f_str)
if f.many_to_many:
fields[f_str] = ['exact']
continue
fields[f_str] = ['exact']
def get_keys_lookups(cl, sub_f):
r = []
for lk, lv in cl.items():
if lk in ('contained_by', 'trigram_similar', 'unaccent', 'search'):
continue
sflk = f'{sub_f}{"__" if sub_f else ""}{lk}'
r.append(sflk)
if hasattr(lv, 'get_lookups'):
r += get_keys_lookups(lv.get_lookups(), sflk)
if hasattr(lv, 'output_field') and hasattr(lv, 'output_field.get_lookups'):
r.append(f'{sflk}{"__" if sflk else ""}range')
r += get_keys_lookups(lv.output_field.class_lookups, sflk)
return r
fields[f_str] = list(
set(fields[f_str] + get_keys_lookups(f.get_lookups(), '')))
# Remove excluded fields
exclude = exclude or []
fields = [(f, lookups)
for f, lookups in fields.items() if f not in exclude]
return OrderedDict(fields)
@classmethod
def filter_for_field(cls, f, name, lookup_expr='exact'):
# Redefine método estático para ignorar filtro para
# fields que não possuam lookup_expr informado
f, lookup_type = resolve_field(f, lookup_expr)
default = {
'field_name': name,
'label': capfirst(f.verbose_name),
'lookup_expr': lookup_expr
}
filter_class, params = cls.filter_for_lookup(
f, lookup_type)
default.update(params)
if filter_class is not None:
return filter_class(**default)
return None
class BusinessRulesNotImplementedMixin:
http_method_names = ['get', 'head', 'options', 'trace']
def create(self, request, *args, **kwargs):
raise Exception(_("POST Create não implementado"))
def update(self, request, *args, **kwargs):
raise Exception(_("PUT and PATCH não implementado"))
def delete(self, request, *args, **kwargs):
raise Exception(_("DELETE Delete não implementado"))
class ApiViewSetConstrutor():
_built_sets = {}
class ApiViewSet(ModelViewSet):
filter_backends = (DjangoFilterBackend,)
@classmethod
def get_viewset_for_model(cls, model):
return cls._built_sets[model._meta.app_config][model]
@classmethod
def update(cls, other):
cls._built_sets.update(other._built_sets)
@classmethod
def import_modules(cls, modules):
for m in modules:
importlib.import_module(m)
@classmethod
def router(cls, router_class=DefaultRouter):
router = router_class()
for app, built_sets in cls._built_sets.items():
for model, viewset in built_sets.items():
router.register(
f'{app.label}/{model._meta.model_name}',
viewset,
basename=f'{app.label}_{model._meta.model_name}')
return router
@classmethod
def build_class(cls, apps_or_models):
DRFAUTOAPI = settings.DRFAUTOAPI
serializers_classes = {}
filters_classes = {}
global_serializer_mixin = rest_serializers.ModelSerializer
global_filter_class = ApiFilterSetMixin
try:
if DRFAUTOAPI:
if 'DEFAULT_SERIALIZER_MODULE' in DRFAUTOAPI:
serializers = importlib.import_module(
DRFAUTOAPI['DEFAULT_SERIALIZER_MODULE']
)
serializers_classes = inspect.getmembers(serializers)
serializers_classes = {i[0]: i[1] for i in filter(
lambda x: x[0].endswith('Serializer'),
serializers_classes
)}
if 'DEFAULT_FILTER_MODULE' in DRFAUTOAPI:
filters = importlib.import_module(
DRFAUTOAPI['DEFAULT_FILTER_MODULE']
)
filters_classes = inspect.getmembers(filters)
filters_classes = {i[0]: i[1] for i in filter(
lambda x: x[0].endswith('FilterSet'),
filters_classes
)}
if 'GLOBAL_SERIALIZER_MIXIN' in DRFAUTOAPI:
cs = DRFAUTOAPI['GLOBAL_SERIALIZER_MIXIN'].split('.')
module = importlib.import_module(
'.'.join(cs[0:-1]))
global_serializer_mixin = getattr(module, cs[-1])
if 'GLOBAL_FILTERSET_MIXIN' in DRFAUTOAPI:
cs = DRFAUTOAPI['GLOBAL_FILTERSET_MIXIN'].split('.')
m = importlib.import_module('.'.join(cs[0:-1]))
global_filter_class = getattr(m, cs[-1])
except Exception as e:
logger.error(e)
built_sets = {}
def build(_model):
object_name = _model._meta.object_name
serializer_name = f'{object_name}Serializer'
_serializer_class = serializers_classes.get(
serializer_name, global_serializer_mixin)
filter_name = f'{object_name}FilterSet'
_filterset_class = filters_classes.get(
filter_name, global_filter_class)
def create_class():
_meta_serializer = object if not hasattr(
_serializer_class, 'Meta') else _serializer_class.Meta
class ApiSerializer(_serializer_class):
class Meta(_meta_serializer):
if not hasattr(_meta_serializer, 'ref_name'):
ref_name = f'{object_name}Serializer'
if not hasattr(_meta_serializer, 'model'):
model = _model
if hasattr(_meta_serializer, 'exclude'):
exclude = _meta_serializer.exclude
else:
if not hasattr(_meta_serializer, 'fields'):
fields = '__all__'
elif _meta_serializer.fields != '__all__':
fields = list(_meta_serializer.fields)
else:
fields = _meta_serializer.fields
_meta_filterset = object if not hasattr(
_filterset_class, 'Meta') else _filterset_class.Meta
class ApiFilterSet(_filterset_class):
class Meta(_meta_filterset, ):
if not hasattr(_meta_filterset, 'model'):
model = _model
class ModelApiViewSet(ApiViewSetConstrutor.ApiViewSet):
queryset = _model.objects.all()
filterset_class = ApiFilterSet
serializer_class = ApiSerializer
return ModelApiViewSet
viewset = create_class()
viewset.__name__ = '%sModelViewSet' % _model.__name__
return viewset
for am in apps_or_models:
if isinstance(am, ModelBase):
app = am._meta.app_config
else:
app = am
if app not in cls._built_sets:
cls._built_sets[app] = {}
if am != app:
cls._built_sets[app][am] = build(am)
continue
for model in app.get_models():
cls._built_sets[app][model] = build(model)
return cls
# Toda Classe construida acima, pode ser redefinida e aplicado quaisquer
# das possibilidades para uma classe normal criada a partir de
# rest_framework.viewsets.ModelViewSet conforme exemplo para a classe autor
# decorator que processa um endpoint detail trivial com base no model passado,
# Um endpoint detail geralmente é um conteúdo baseado numa FK com outros possíveis filtros
# e os passados pelo proprio cliente, além de o serializer e o filterset
# ser desse model passado
class wrapper_queryset_response_for_drf_action(object):
def __init__(self, model):
self.model = model
def __call__(self, cls):
def wrapper(instance_view, *args, **kwargs):
# recupera a viewset do model anotado
iv = instance_view
viewset_from_model = ApiViewSetConstrutor._built_sets[
self.model._meta.app_config][self.model]
# apossa da instancia da viewset mae do action
# em uma viewset que processa dados do model passado no decorator
iv.queryset = viewset_from_model.queryset
iv.serializer_class = viewset_from_model.serializer_class
iv.filterset_class = viewset_from_model.filterset_class
iv.queryset = instance_view.filter_queryset(
iv.get_queryset())
# chama efetivamente o metodo anotado que deve devolver um queryset
# com os filtros específicos definido pelo programador customizador
qs = cls(instance_view, *args, **kwargs)
page = iv.paginate_queryset(qs)
data = iv.get_serializer(
page if page is not None else qs, many=True).data
return iv.get_paginated_response(
data) if page is not None else Response(data)
return wrapper
# decorator para recuperar e transformar o default
class customize(object):
def __init__(self, model):
self.model = model
def __call__(self, cls):
class _ApiViewSet(
cls,
ApiViewSetConstrutor._built_sets[
self.model._meta.app_config][self.model]
):
pass
if hasattr(_ApiViewSet, 'build'):
_ApiViewSet = _ApiViewSet.build()
ApiViewSetConstrutor._built_sets[
self.model._meta.app_config][self.model] = _ApiViewSet
return _ApiViewSet