mirror of https://github.com/interlegis/sapl.git
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
411 lines
14 KiB
411 lines
14 KiB
from collections import OrderedDict
|
|
import importlib
|
|
import inspect
|
|
import logging
|
|
import re
|
|
|
|
from django.apps.config import AppConfig
|
|
from django.apps.registry import apps
|
|
from django.conf import settings
|
|
from django.contrib.postgres.fields.jsonb import JSONField
|
|
from django.db.models.base import ModelBase
|
|
from django.db.models.fields import TextField, CharField
|
|
from django.db.models.fields.files import FileField
|
|
from django.template.defaultfilters import capfirst
|
|
from django.utils.translation import gettext_lazy as _
|
|
import django_filters
|
|
from django_filters.constants import ALL_FIELDS, EMPTY_VALUES
|
|
from django_filters.filters import CharFilter
|
|
from django_filters.filterset import FilterSet
|
|
from django_filters.rest_framework.backends import DjangoFilterBackend
|
|
from django_filters.utils import resolve_field, get_all_model_fields
|
|
from rest_framework import serializers as rest_serializers
|
|
from rest_framework.response import Response
|
|
from rest_framework.routers import DefaultRouter
|
|
from rest_framework.viewsets import ModelViewSet
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class SplitStringCharFilter(django_filters.CharFilter):
|
|
_re = re.compile(r'("[^"]+"| +|[^"]+)')
|
|
|
|
def filter(self, qs, value):
|
|
if value in EMPTY_VALUES:
|
|
return qs
|
|
if self.distinct:
|
|
qs = qs.distinct()
|
|
lookup = '%s__%s' % (self.field_name, self.lookup_expr)
|
|
|
|
values = [value]
|
|
if self.lookup_expr == 'icontains':
|
|
if not '"' in value:
|
|
values = value.split(' ')
|
|
else:
|
|
values = list(
|
|
filter(
|
|
lambda x: x and x != ' ' and x[0] != '"',
|
|
self._re.findall(value)
|
|
)
|
|
) + list(
|
|
map(
|
|
lambda x: x[1:-1],
|
|
filter(
|
|
lambda x: x and x[0] == '"',
|
|
self._re.findall(value)
|
|
)
|
|
)
|
|
)
|
|
|
|
if not isinstance(values, list):
|
|
values = [values]
|
|
for v in values:
|
|
qs = self.get_method(qs)(**{lookup: v})
|
|
return qs
|
|
|
|
|
|
class ApiFilterSetMixin(FilterSet):
|
|
|
|
o = CharFilter(method='filter_o')
|
|
|
|
class Meta:
|
|
fields = '__all__'
|
|
filter_overrides = {
|
|
FileField: {
|
|
'filter_class': django_filters.CharFilter,
|
|
'extra': lambda f: {
|
|
'lookup_expr': 'exact',
|
|
},
|
|
},
|
|
CharField: {
|
|
'filter_class': SplitStringCharFilter,
|
|
},
|
|
TextField: {
|
|
'filter_class': SplitStringCharFilter,
|
|
},
|
|
JSONField: {
|
|
'filter_class': django_filters.CharFilter,
|
|
'extra': lambda f: {
|
|
'lookup_expr': 'exact',
|
|
},
|
|
},
|
|
}
|
|
|
|
def filter_o(self, queryset, name, value):
|
|
try:
|
|
return queryset.order_by(
|
|
*map(str.strip, value.split(',')))
|
|
except:
|
|
return queryset
|
|
|
|
@classmethod
|
|
def get_fields(cls):
|
|
model = cls._meta.model
|
|
fields_model = get_all_model_fields(model)
|
|
fields_filter = cls._meta.fields
|
|
exclude = cls._meta.exclude
|
|
|
|
if exclude is not None and fields_filter is None:
|
|
fields_filter = ALL_FIELDS
|
|
|
|
fields = fields_filter if isinstance(fields_filter, dict) else {}
|
|
|
|
for f_str in fields_model:
|
|
if f_str not in fields:
|
|
|
|
f = model._meta.get_field(f_str)
|
|
|
|
if f.many_to_many:
|
|
fields[f_str] = ['exact']
|
|
continue
|
|
|
|
fields[f_str] = ['exact']
|
|
|
|
def get_keys_lookups(cl, sub_f):
|
|
r = []
|
|
for lk, lv in cl.items():
|
|
|
|
if lk in ('contained_by', 'trigram_similar', 'unaccent', 'search'):
|
|
continue
|
|
|
|
sflk = f'{sub_f}{"__" if sub_f else ""}{lk}'
|
|
r.append(sflk)
|
|
|
|
if hasattr(lv, 'get_lookups'):
|
|
r += get_keys_lookups(lv.get_lookups(), sflk)
|
|
|
|
if hasattr(lv, 'output_field') and hasattr(lv, 'output_field.get_lookups'):
|
|
r.append(f'{sflk}{"__" if sflk else ""}range')
|
|
|
|
r += get_keys_lookups(lv.output_field.class_lookups, sflk)
|
|
|
|
return r
|
|
|
|
fields[f_str] = list(
|
|
set(fields[f_str] + get_keys_lookups(f.get_lookups(), '')))
|
|
|
|
# Remove excluded fields
|
|
exclude = exclude or []
|
|
|
|
fields = [(f, lookups)
|
|
for f, lookups in fields.items() if f not in exclude]
|
|
|
|
return OrderedDict(fields)
|
|
|
|
@classmethod
|
|
def filter_for_field(cls, f, name, lookup_expr='exact'):
|
|
# Redefine método estático para ignorar filtro para
|
|
# fields que não possuam lookup_expr informado
|
|
|
|
f, lookup_type = resolve_field(f, lookup_expr)
|
|
|
|
default = {
|
|
'field_name': name,
|
|
'label': capfirst(f.verbose_name),
|
|
'lookup_expr': lookup_expr
|
|
}
|
|
|
|
filter_class, params = cls.filter_for_lookup(
|
|
f, lookup_type)
|
|
default.update(params)
|
|
if filter_class is not None:
|
|
return filter_class(**default)
|
|
return None
|
|
|
|
|
|
class BusinessRulesNotImplementedMixin:
|
|
http_method_names = ['get', 'head', 'options', 'trace']
|
|
|
|
def create(self, request, *args, **kwargs):
|
|
raise Exception(_("POST Create não implementado"))
|
|
|
|
def update(self, request, *args, **kwargs):
|
|
raise Exception(_("PUT and PATCH não implementado"))
|
|
|
|
def delete(self, request, *args, **kwargs):
|
|
raise Exception(_("DELETE Delete não implementado"))
|
|
|
|
|
|
class ApiViewSetConstrutor():
|
|
|
|
_built_sets = {}
|
|
|
|
class ApiViewSet(ModelViewSet):
|
|
filter_backends = (DjangoFilterBackend,)
|
|
|
|
@classmethod
|
|
def get_viewset_for_model(cls, model):
|
|
return cls._built_sets[model._meta.app_config][model]
|
|
|
|
@classmethod
|
|
def update(cls, other):
|
|
cls._built_sets.update(other._built_sets)
|
|
|
|
@classmethod
|
|
def import_modules(cls, modules):
|
|
for m in modules:
|
|
importlib.import_module(m)
|
|
|
|
@classmethod
|
|
def router(cls, router_class=DefaultRouter):
|
|
router = router_class()
|
|
for app, built_sets in cls._built_sets.items():
|
|
for model, viewset in built_sets.items():
|
|
router.register(
|
|
f'{app.label}/{model._meta.model_name}',
|
|
viewset,
|
|
basename=f'{app.label}_{model._meta.model_name}')
|
|
return router
|
|
|
|
@classmethod
|
|
def build_class(cls, apps_or_models):
|
|
|
|
DRFAUTOAPI = settings.DRFAUTOAPI
|
|
|
|
serializers_classes = {}
|
|
filters_classes = {}
|
|
|
|
global_serializer_mixin = rest_serializers.ModelSerializer
|
|
global_filter_class = ApiFilterSetMixin
|
|
|
|
try:
|
|
if DRFAUTOAPI:
|
|
if 'DEFAULT_SERIALIZER_MODULE' in DRFAUTOAPI:
|
|
serializers = importlib.import_module(
|
|
DRFAUTOAPI['DEFAULT_SERIALIZER_MODULE']
|
|
)
|
|
serializers_classes = inspect.getmembers(serializers)
|
|
serializers_classes = {i[0]: i[1] for i in filter(
|
|
lambda x: x[0].endswith('Serializer'),
|
|
serializers_classes
|
|
)}
|
|
|
|
if 'DEFAULT_FILTER_MODULE' in DRFAUTOAPI:
|
|
filters = importlib.import_module(
|
|
DRFAUTOAPI['DEFAULT_FILTER_MODULE']
|
|
)
|
|
filters_classes = inspect.getmembers(filters)
|
|
filters_classes = {i[0]: i[1] for i in filter(
|
|
lambda x: x[0].endswith('FilterSet'),
|
|
filters_classes
|
|
)}
|
|
|
|
if 'GLOBAL_SERIALIZER_MIXIN' in DRFAUTOAPI:
|
|
cs = DRFAUTOAPI['GLOBAL_SERIALIZER_MIXIN'].split('.')
|
|
module = importlib.import_module(
|
|
'.'.join(cs[0:-1]))
|
|
global_serializer_mixin = getattr(module, cs[-1])
|
|
|
|
if 'GLOBAL_FILTERSET_MIXIN' in DRFAUTOAPI:
|
|
cs = DRFAUTOAPI['GLOBAL_FILTERSET_MIXIN'].split('.')
|
|
m = importlib.import_module('.'.join(cs[0:-1]))
|
|
global_filter_class = getattr(m, cs[-1])
|
|
|
|
except Exception as e:
|
|
logger.error(e)
|
|
|
|
built_sets = {}
|
|
|
|
def build(_model):
|
|
object_name = _model._meta.object_name
|
|
|
|
serializer_name = f'{object_name}Serializer'
|
|
_serializer_class = serializers_classes.get(
|
|
serializer_name, global_serializer_mixin)
|
|
|
|
filter_name = f'{object_name}FilterSet'
|
|
_filterset_class = filters_classes.get(
|
|
filter_name, global_filter_class)
|
|
|
|
def create_class():
|
|
|
|
_meta_serializer = object if not hasattr(
|
|
_serializer_class, 'Meta') else _serializer_class.Meta
|
|
|
|
class ApiSerializer(_serializer_class):
|
|
|
|
class Meta(_meta_serializer):
|
|
if not hasattr(_meta_serializer, 'ref_name'):
|
|
ref_name = f'{object_name}Serializer'
|
|
|
|
if not hasattr(_meta_serializer, 'model'):
|
|
model = _model
|
|
|
|
if hasattr(_meta_serializer, 'exclude'):
|
|
exclude = _meta_serializer.exclude
|
|
else:
|
|
if not hasattr(_meta_serializer, 'fields'):
|
|
fields = '__all__'
|
|
elif _meta_serializer.fields != '__all__':
|
|
fields = list(_meta_serializer.fields)
|
|
else:
|
|
fields = _meta_serializer.fields
|
|
|
|
_meta_filterset = object if not hasattr(
|
|
_filterset_class, 'Meta') else _filterset_class.Meta
|
|
|
|
class ApiFilterSet(_filterset_class):
|
|
|
|
class Meta(_meta_filterset, ):
|
|
if not hasattr(_meta_filterset, 'model'):
|
|
model = _model
|
|
|
|
class ModelApiViewSet(ApiViewSetConstrutor.ApiViewSet):
|
|
queryset = _model.objects.all()
|
|
filterset_class = ApiFilterSet
|
|
serializer_class = ApiSerializer
|
|
|
|
return ModelApiViewSet
|
|
|
|
viewset = create_class()
|
|
viewset.__name__ = '%sModelViewSet' % _model.__name__
|
|
return viewset
|
|
|
|
for am in apps_or_models:
|
|
|
|
if isinstance(am, ModelBase):
|
|
app = am._meta.app_config
|
|
else:
|
|
app = am
|
|
|
|
if app not in cls._built_sets:
|
|
cls._built_sets[app] = {}
|
|
|
|
if am != app:
|
|
cls._built_sets[app][am] = build(am)
|
|
continue
|
|
|
|
for model in app.get_models():
|
|
cls._built_sets[app][model] = build(model)
|
|
|
|
return cls
|
|
|
|
|
|
# Toda Classe construida acima, pode ser redefinida e aplicado quaisquer
|
|
# das possibilidades para uma classe normal criada a partir de
|
|
# rest_framework.viewsets.ModelViewSet conforme exemplo para a classe autor
|
|
|
|
# decorator que processa um endpoint detail trivial com base no model passado,
|
|
# Um endpoint detail geralmente é um conteúdo baseado numa FK com outros possíveis filtros
|
|
# e os passados pelo proprio cliente, além de o serializer e o filterset
|
|
# ser desse model passado
|
|
|
|
|
|
class wrapper_queryset_response_for_drf_action(object):
|
|
|
|
def __init__(self, model):
|
|
self.model = model
|
|
|
|
def __call__(self, cls):
|
|
|
|
def wrapper(instance_view, *args, **kwargs):
|
|
# recupera a viewset do model anotado
|
|
iv = instance_view
|
|
viewset_from_model = ApiViewSetConstrutor._built_sets[
|
|
self.model._meta.app_config][self.model]
|
|
|
|
# apossa da instancia da viewset mae do action
|
|
# em uma viewset que processa dados do model passado no decorator
|
|
iv.queryset = viewset_from_model.queryset
|
|
iv.serializer_class = viewset_from_model.serializer_class
|
|
iv.filterset_class = viewset_from_model.filterset_class
|
|
|
|
iv.queryset = instance_view.filter_queryset(
|
|
iv.get_queryset())
|
|
|
|
# chama efetivamente o metodo anotado que deve devolver um queryset
|
|
# com os filtros específicos definido pelo programador customizador
|
|
qs = cls(instance_view, *args, **kwargs)
|
|
|
|
page = iv.paginate_queryset(qs)
|
|
data = iv.get_serializer(
|
|
page if page is not None else qs, many=True).data
|
|
|
|
return iv.get_paginated_response(
|
|
data) if page is not None else Response(data)
|
|
|
|
return wrapper
|
|
|
|
|
|
# decorator para recuperar e transformar o default
|
|
class customize(object):
|
|
|
|
def __init__(self, model):
|
|
self.model = model
|
|
|
|
def __call__(self, cls):
|
|
|
|
class _ApiViewSet(
|
|
cls,
|
|
ApiViewSetConstrutor._built_sets[
|
|
self.model._meta.app_config][self.model]
|
|
):
|
|
pass
|
|
|
|
if hasattr(_ApiViewSet, 'build'):
|
|
_ApiViewSet = _ApiViewSet.build()
|
|
|
|
ApiViewSetConstrutor._built_sets[
|
|
self.model._meta.app_config][self.model] = _ApiViewSet
|
|
return _ApiViewSet
|
|
|