mirror of https://github.com/interlegis/sapl.git
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
413 lines
14 KiB
413 lines
14 KiB
from collections import OrderedDict
|
|
import importlib
|
|
import inspect
|
|
import logging
|
|
import re
|
|
|
|
from django.apps.config import AppConfig
|
|
from django.apps.registry import apps
|
|
from django.conf import settings
|
|
from django.db.models import JSONField
|
|
from django.db.models.base import ModelBase
|
|
from django.db.models.fields import TextField, CharField
|
|
from django.db.models.fields.files import FileField
|
|
from django.template.defaultfilters import capfirst
|
|
from django.utils.translation import gettext_lazy as _
|
|
import django_filters
|
|
from django_filters.constants import ALL_FIELDS, EMPTY_VALUES
|
|
from django_filters.filters import CharFilter
|
|
from django_filters.filterset import FilterSet
|
|
from django_filters.rest_framework.backends import DjangoFilterBackend
|
|
from django_filters.utils import resolve_field, get_all_model_fields
|
|
from rest_framework import serializers as rest_serializers
|
|
from rest_framework.response import Response
|
|
from rest_framework.routers import DefaultRouter
|
|
from rest_framework.viewsets import ModelViewSet
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class SplitStringCharFilter(django_filters.CharFilter):
|
|
_re = re.compile(r'("[^"]+"| +|[^"]+)')
|
|
|
|
def filter(self, qs, value):
|
|
if value in EMPTY_VALUES:
|
|
return qs
|
|
if self.distinct:
|
|
qs = qs.distinct()
|
|
lookup = "%s__%s" % (self.field_name, self.lookup_expr)
|
|
|
|
values = [value]
|
|
if self.lookup_expr == "icontains":
|
|
if not '"' in value:
|
|
values = value.split(" ")
|
|
else:
|
|
values = list(
|
|
filter(
|
|
lambda x: x and x != " " and x[0] != '"',
|
|
self._re.findall(value),
|
|
)
|
|
) + list(
|
|
map(
|
|
lambda x: x[1:-1],
|
|
filter(lambda x: x and x[0] == '"', self._re.findall(value)),
|
|
)
|
|
)
|
|
|
|
if not isinstance(values, list):
|
|
values = [values]
|
|
for v in values:
|
|
qs = self.get_method(qs)(**{lookup: v})
|
|
return qs
|
|
|
|
|
|
class ApiFilterSetMixin(FilterSet):
|
|
o = CharFilter(method="filter_o")
|
|
|
|
class Meta:
|
|
fields = "__all__"
|
|
filter_overrides = {
|
|
FileField: {
|
|
"filter_class": django_filters.CharFilter,
|
|
"extra": lambda f: {
|
|
"lookup_expr": "exact",
|
|
},
|
|
},
|
|
CharField: {
|
|
"filter_class": SplitStringCharFilter,
|
|
},
|
|
TextField: {
|
|
"filter_class": SplitStringCharFilter,
|
|
},
|
|
JSONField: {
|
|
"filter_class": django_filters.CharFilter,
|
|
"extra": lambda f: {
|
|
"lookup_expr": "exact",
|
|
},
|
|
},
|
|
}
|
|
|
|
def filter_o(self, queryset, name, value):
|
|
try:
|
|
return queryset.order_by(*map(str.strip, value.split(",")))
|
|
except:
|
|
return queryset
|
|
|
|
@classmethod
|
|
def get_fields(cls):
|
|
model = cls._meta.model
|
|
fields_model = get_all_model_fields(model)
|
|
fields_filter = cls._meta.fields
|
|
exclude = cls._meta.exclude
|
|
|
|
if exclude is not None and fields_filter is None:
|
|
fields_filter = ALL_FIELDS
|
|
|
|
fields = fields_filter if isinstance(fields_filter, dict) else {}
|
|
|
|
for f_str in fields_model:
|
|
if f_str not in fields:
|
|
f = model._meta.get_field(f_str)
|
|
|
|
if f.many_to_many:
|
|
fields[f_str] = ["exact"]
|
|
continue
|
|
|
|
fields[f_str] = ["exact"]
|
|
|
|
def get_keys_lookups(cl, sub_f):
|
|
r = []
|
|
for lk, lv in cl.items():
|
|
if lk in (
|
|
"contained_by",
|
|
"trigram_similar",
|
|
"unaccent",
|
|
"search",
|
|
):
|
|
continue
|
|
|
|
sflk = f'{sub_f}{"__" if sub_f else ""}{lk}'
|
|
r.append(sflk)
|
|
|
|
if hasattr(lv, "get_lookups"):
|
|
r += get_keys_lookups(lv.get_lookups(), sflk)
|
|
|
|
if hasattr(lv, "output_field") and hasattr(
|
|
lv, "output_field.get_lookups"
|
|
):
|
|
r.append(f'{sflk}{"__" if sflk else ""}range')
|
|
|
|
r += get_keys_lookups(lv.output_field.class_lookups, sflk)
|
|
|
|
return r
|
|
|
|
fields[f_str] = list(
|
|
set(fields[f_str] + get_keys_lookups(f.get_lookups(), ""))
|
|
)
|
|
|
|
# Remove excluded fields
|
|
exclude = exclude or []
|
|
|
|
fields = [(f, lookups) for f, lookups in fields.items() if f not in exclude]
|
|
|
|
return OrderedDict(fields)
|
|
|
|
@classmethod
|
|
def filter_for_field(cls, f, name, lookup_expr="exact"):
|
|
# Redefine método estático para ignorar filtro para
|
|
# fields que não possuam lookup_expr informado
|
|
|
|
f, lookup_type = resolve_field(f, lookup_expr)
|
|
|
|
default = {
|
|
"field_name": name,
|
|
"label": capfirst(f.verbose_name),
|
|
"lookup_expr": lookup_expr,
|
|
}
|
|
|
|
filter_class, params = cls.filter_for_lookup(f, lookup_type)
|
|
default.update(params)
|
|
if filter_class is not None:
|
|
return filter_class(**default)
|
|
return None
|
|
|
|
|
|
class BusinessRulesNotImplementedMixin:
|
|
http_method_names = ["get", "head", "options", "trace"]
|
|
|
|
def create(self, request, *args, **kwargs):
|
|
raise Exception(_("POST Create não implementado"))
|
|
|
|
def update(self, request, *args, **kwargs):
|
|
raise Exception(_("PUT and PATCH não implementado"))
|
|
|
|
def delete(self, request, *args, **kwargs):
|
|
raise Exception(_("DELETE Delete não implementado"))
|
|
|
|
|
|
class ApiViewSetConstrutor:
|
|
_built_sets = {}
|
|
|
|
class ApiViewSet(ModelViewSet):
|
|
filter_backends = (DjangoFilterBackend,)
|
|
|
|
@classmethod
|
|
def get_viewset_for_model(cls, model):
|
|
return cls._built_sets[model._meta.app_config][model]
|
|
|
|
@classmethod
|
|
def update(cls, other):
|
|
cls._built_sets.update(other._built_sets)
|
|
|
|
@classmethod
|
|
def import_modules(cls, modules):
|
|
for m in modules:
|
|
importlib.import_module(m)
|
|
|
|
@classmethod
|
|
def router(cls, router_class=DefaultRouter):
|
|
router = router_class()
|
|
for app, built_sets in cls._built_sets.items():
|
|
app_label = getattr(app, "label", app.name.split(".")[-1])
|
|
for model, viewset in built_sets.items():
|
|
router.register(
|
|
f"{app.label}/{model._meta.model_name}",
|
|
viewset,
|
|
basename=f"{app_label}-{model._meta.model_name}",
|
|
)
|
|
return router
|
|
|
|
@classmethod
|
|
def build_class(cls, apps_or_models):
|
|
DRFAUTOAPI = settings.DRFAUTOAPI
|
|
|
|
serializers_classes = {}
|
|
filters_classes = {}
|
|
|
|
global_serializer_mixin = rest_serializers.ModelSerializer
|
|
global_filter_class = ApiFilterSetMixin
|
|
|
|
try:
|
|
if DRFAUTOAPI:
|
|
if "DEFAULT_SERIALIZER_MODULE" in DRFAUTOAPI:
|
|
serializers = importlib.import_module(
|
|
DRFAUTOAPI["DEFAULT_SERIALIZER_MODULE"]
|
|
)
|
|
serializers_classes = inspect.getmembers(serializers)
|
|
serializers_classes = {
|
|
i[0]: i[1]
|
|
for i in filter(
|
|
lambda x: x[0].endswith("Serializer"), serializers_classes
|
|
)
|
|
}
|
|
|
|
if "DEFAULT_FILTER_MODULE" in DRFAUTOAPI:
|
|
filters = importlib.import_module(
|
|
DRFAUTOAPI["DEFAULT_FILTER_MODULE"]
|
|
)
|
|
filters_classes = inspect.getmembers(filters)
|
|
filters_classes = {
|
|
i[0]: i[1]
|
|
for i in filter(
|
|
lambda x: x[0].endswith("FilterSet"), filters_classes
|
|
)
|
|
}
|
|
|
|
if "GLOBAL_SERIALIZER_MIXIN" in DRFAUTOAPI:
|
|
cs = DRFAUTOAPI["GLOBAL_SERIALIZER_MIXIN"].split(".")
|
|
module = importlib.import_module(".".join(cs[0:-1]))
|
|
global_serializer_mixin = getattr(module, cs[-1])
|
|
|
|
if "GLOBAL_FILTERSET_MIXIN" in DRFAUTOAPI:
|
|
cs = DRFAUTOAPI["GLOBAL_FILTERSET_MIXIN"].split(".")
|
|
m = importlib.import_module(".".join(cs[0:-1]))
|
|
global_filter_class = getattr(m, cs[-1])
|
|
|
|
except Exception as e:
|
|
logger.error(e)
|
|
|
|
built_sets = {}
|
|
|
|
def build(_model):
|
|
object_name = _model._meta.object_name
|
|
|
|
serializer_name = f"{object_name}Serializer"
|
|
_serializer_class = serializers_classes.get(
|
|
serializer_name, global_serializer_mixin
|
|
)
|
|
|
|
filter_name = f"{object_name}FilterSet"
|
|
_filterset_class = filters_classes.get(filter_name, global_filter_class)
|
|
|
|
def create_class():
|
|
_meta_serializer = (
|
|
object
|
|
if not hasattr(_serializer_class, "Meta")
|
|
else _serializer_class.Meta
|
|
)
|
|
|
|
class ApiSerializer(_serializer_class):
|
|
class Meta(_meta_serializer):
|
|
if not hasattr(_meta_serializer, "ref_name"):
|
|
ref_name = f"{object_name}Serializer"
|
|
|
|
if not hasattr(_meta_serializer, "model"):
|
|
model = _model
|
|
|
|
if hasattr(_meta_serializer, "exclude"):
|
|
exclude = _meta_serializer.exclude
|
|
else:
|
|
if not hasattr(_meta_serializer, "fields"):
|
|
fields = "__all__"
|
|
elif _meta_serializer.fields != "__all__":
|
|
fields = list(_meta_serializer.fields)
|
|
else:
|
|
fields = _meta_serializer.fields
|
|
|
|
_meta_filterset = (
|
|
object
|
|
if not hasattr(_filterset_class, "Meta")
|
|
else _filterset_class.Meta
|
|
)
|
|
|
|
class ApiFilterSet(_filterset_class):
|
|
class Meta(
|
|
_meta_filterset,
|
|
):
|
|
if not hasattr(_meta_filterset, "model"):
|
|
model = _model
|
|
|
|
class ModelApiViewSet(ApiViewSetConstrutor.ApiViewSet):
|
|
queryset = _model.objects.all()
|
|
filterset_class = ApiFilterSet
|
|
serializer_class = ApiSerializer
|
|
|
|
return ModelApiViewSet
|
|
|
|
viewset = create_class()
|
|
viewset.__name__ = "%sModelViewSet" % _model.__name__
|
|
return viewset
|
|
|
|
for am in apps_or_models:
|
|
if isinstance(am, ModelBase):
|
|
app = am._meta.app_config
|
|
else:
|
|
app = am
|
|
|
|
if app not in cls._built_sets:
|
|
cls._built_sets[app] = {}
|
|
|
|
if am != app:
|
|
cls._built_sets[app][am] = build(am)
|
|
continue
|
|
|
|
for model in app.get_models():
|
|
cls._built_sets[app][model] = build(model)
|
|
|
|
return cls
|
|
|
|
|
|
# Toda Classe construida acima, pode ser redefinida e aplicado quaisquer
|
|
# das possibilidades para uma classe normal criada a partir de
|
|
# rest_framework.viewsets.ModelViewSet conforme exemplo para a classe autor
|
|
|
|
# decorator que processa um endpoint detail trivial com base no model passado,
|
|
# Um endpoint detail geralmente é um conteúdo baseado numa FK com outros possíveis filtros
|
|
# e os passados pelo proprio cliente, além de o serializer e o filterset
|
|
# ser desse model passado
|
|
|
|
|
|
class wrapper_queryset_response_for_drf_action(object):
|
|
def __init__(self, model):
|
|
self.model = model
|
|
|
|
def __call__(self, cls):
|
|
def wrapper(instance_view, *args, **kwargs):
|
|
# recupera a viewset do model anotado
|
|
iv = instance_view
|
|
viewset_from_model = ApiViewSetConstrutor._built_sets[
|
|
self.model._meta.app_config
|
|
][self.model]
|
|
|
|
# apossa da instancia da viewset mae do action
|
|
# em uma viewset que processa dados do model passado no decorator
|
|
iv.queryset = viewset_from_model.queryset
|
|
iv.serializer_class = viewset_from_model.serializer_class
|
|
iv.filterset_class = viewset_from_model.filterset_class
|
|
|
|
iv.queryset = instance_view.filter_queryset(iv.get_queryset())
|
|
|
|
# chama efetivamente o metodo anotado que deve devolver um queryset
|
|
# com os filtros específicos definido pelo programador customizador
|
|
qs = cls(instance_view, *args, **kwargs)
|
|
|
|
page = iv.paginate_queryset(qs)
|
|
data = iv.get_serializer(page if page is not None else qs, many=True).data
|
|
|
|
return (
|
|
iv.get_paginated_response(data) if page is not None else Response(data)
|
|
)
|
|
|
|
return wrapper
|
|
|
|
|
|
# decorator para recuperar e transformar o default
|
|
class customize(object):
|
|
def __init__(self, model):
|
|
self.model = model
|
|
|
|
def __call__(self, cls):
|
|
class _ApiViewSet(
|
|
cls,
|
|
ApiViewSetConstrutor._built_sets[self.model._meta.app_config][self.model],
|
|
):
|
|
pass
|
|
|
|
if hasattr(_ApiViewSet, "build"):
|
|
_ApiViewSet = _ApiViewSet.build()
|
|
|
|
ApiViewSetConstrutor._built_sets[self.model._meta.app_config][
|
|
self.model
|
|
] = _ApiViewSet
|
|
return _ApiViewSet
|
|
|