|
@ -1,23 +1,30 @@ |
|
|
|
|
|
from datetime import datetime |
|
|
import inspect |
|
|
import inspect |
|
|
import logging |
|
|
import logging |
|
|
|
|
|
|
|
|
|
|
|
from PyPDF4.pdf import PdfFileReader |
|
|
|
|
|
from asn1crypto import cms |
|
|
from django.conf import settings |
|
|
from django.conf import settings |
|
|
from django.contrib.contenttypes.models import ContentType |
|
|
from django.contrib.contenttypes.models import ContentType |
|
|
from django.core import serializers |
|
|
from django.core import serializers |
|
|
|
|
|
from django.core.files.uploadedfile import InMemoryUploadedFile, UploadedFile |
|
|
|
|
|
from django.db.models.fields.files import FileField |
|
|
from django.db.models.signals import post_delete, post_save, \ |
|
|
from django.db.models.signals import post_delete, post_save, \ |
|
|
post_migrate |
|
|
post_migrate, pre_save |
|
|
from django.db.utils import DEFAULT_DB_ALIAS |
|
|
from django.db.utils import DEFAULT_DB_ALIAS |
|
|
from django.dispatch import receiver |
|
|
from django.dispatch import receiver |
|
|
from django.utils import timezone |
|
|
from django.utils import timezone |
|
|
from django.utils.translation import ugettext_lazy as _ |
|
|
from django.utils.translation import ugettext_lazy as _ |
|
|
|
|
|
|
|
|
from sapl.base.email_utils import do_envia_email_tramitacao |
|
|
from sapl.base.email_utils import do_envia_email_tramitacao |
|
|
from sapl.base.models import AuditLog, TipoAutor, Autor |
|
|
from sapl.base.models import AuditLog, TipoAutor, Autor, Metadata |
|
|
from sapl.decorators import receiver_multi_senders |
|
|
from sapl.decorators import receiver_multi_senders |
|
|
from sapl.materia.models import Tramitacao |
|
|
from sapl.materia.models import Tramitacao |
|
|
|
|
|
from sapl.parlamentares.models import Parlamentar |
|
|
from sapl.protocoloadm.models import TramitacaoAdministrativo |
|
|
from sapl.protocoloadm.models import TramitacaoAdministrativo |
|
|
from sapl.utils import get_base_url, models_with_gr_for_model |
|
|
from sapl.utils import get_base_url, models_with_gr_for_model |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
models_with_gr_for_autor = models_with_gr_for_model(Autor) |
|
|
models_with_gr_for_autor = models_with_gr_for_model(Autor) |
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -192,3 +199,270 @@ def cria_models_tipo_autor(app_config=None, verbosity=2, interactive=True, |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
post_migrate.connect(receiver=cria_models_tipo_autor) |
|
|
post_migrate.connect(receiver=cria_models_tipo_autor) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def signed_files_extraction_function(sender, instance, **kwargs): |
|
|
|
|
|
|
|
|
|
|
|
def run_signed_name_and_date_via_fields(fields): |
|
|
|
|
|
signs = [] |
|
|
|
|
|
|
|
|
|
|
|
for key, field in fields.items(): |
|
|
|
|
|
|
|
|
|
|
|
if '/FT' not in field and field['/FT'] != '/Sig': |
|
|
|
|
|
continue |
|
|
|
|
|
if '/V' not in field: |
|
|
|
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
content_sign = field['/V']['/Contents'] |
|
|
|
|
|
nome = 'Nome do assinante não localizado.' |
|
|
|
|
|
oname = '' |
|
|
|
|
|
try: |
|
|
|
|
|
info = cms.ContentInfo.load(content_sign) |
|
|
|
|
|
signed_data = info['content'] |
|
|
|
|
|
oun_old = [] |
|
|
|
|
|
for cert in signed_data['certificates']: |
|
|
|
|
|
subject = cert.native['tbs_certificate']['subject'] |
|
|
|
|
|
issuer = cert.native['tbs_certificate']['issuer'] |
|
|
|
|
|
oname = issuer.get('organization_name', '') |
|
|
|
|
|
|
|
|
|
|
|
if oname == 'Gov-Br': |
|
|
|
|
|
nome = subject['common_name'].split(':')[0] |
|
|
|
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
oun = subject['organizational_unit_name'] |
|
|
|
|
|
|
|
|
|
|
|
if isinstance(oun, str): |
|
|
|
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
if len(oun) > len(oun_old): |
|
|
|
|
|
oun_old = oun |
|
|
|
|
|
nome = subject['common_name'].split(':')[0] |
|
|
|
|
|
|
|
|
|
|
|
if oun and isinstance(oun, list) and len(oun) == 4: |
|
|
|
|
|
oname += ' - ' + oun[3] |
|
|
|
|
|
break |
|
|
|
|
|
|
|
|
|
|
|
except: |
|
|
|
|
|
if '/Name' in field['/V']: |
|
|
|
|
|
nome = field['/V']['/Name'] |
|
|
|
|
|
|
|
|
|
|
|
fd = None |
|
|
|
|
|
try: |
|
|
|
|
|
data = str(field['/V']['/M']) |
|
|
|
|
|
|
|
|
|
|
|
if 'D:' not in data: |
|
|
|
|
|
data = None |
|
|
|
|
|
else: |
|
|
|
|
|
if not data.endswith('Z'): |
|
|
|
|
|
data = data.replace('Z', '+') |
|
|
|
|
|
data = data.replace("'", '') |
|
|
|
|
|
|
|
|
|
|
|
fd = datetime.strptime(data[2:], '%Y%m%d%H%M%S%z') |
|
|
|
|
|
except: |
|
|
|
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
signs.append((nome, [fd, oname])) |
|
|
|
|
|
|
|
|
|
|
|
return signs |
|
|
|
|
|
|
|
|
|
|
|
def run_signed_name_and_date_extract(file): |
|
|
|
|
|
signs = {} |
|
|
|
|
|
fields = {} |
|
|
|
|
|
pdfdata = file.read() |
|
|
|
|
|
|
|
|
|
|
|
# se não tem byterange então não é assinado |
|
|
|
|
|
byterange = [] |
|
|
|
|
|
n = -1 |
|
|
|
|
|
while True: |
|
|
|
|
|
n = pdfdata.find(b"/ByteRange", n + 1) |
|
|
|
|
|
if n == -1: |
|
|
|
|
|
break |
|
|
|
|
|
byterange.append(n) |
|
|
|
|
|
|
|
|
|
|
|
if not byterange: |
|
|
|
|
|
return signs |
|
|
|
|
|
|
|
|
|
|
|
# tenta extrair via /Fields |
|
|
|
|
|
try: |
|
|
|
|
|
pdf = PdfFileReader(file) |
|
|
|
|
|
fields = pdf.getFields() |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
try: |
|
|
|
|
|
pdf = PdfFileReader(file, strict=False) |
|
|
|
|
|
fields = pdf.getFields() |
|
|
|
|
|
except Exception as ee: |
|
|
|
|
|
fields = ee |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
# se a extração via /Fields ocorrer sem erros e forem capturadas |
|
|
|
|
|
# tantas assinaturas quanto byteranges |
|
|
|
|
|
if isinstance(fields, dict): |
|
|
|
|
|
signs = run_signed_name_and_date_via_fields(fields) |
|
|
|
|
|
if len(signs) == len(byterange): |
|
|
|
|
|
return signs |
|
|
|
|
|
|
|
|
|
|
|
for n in byterange: |
|
|
|
|
|
|
|
|
|
|
|
start = pdfdata.find(b"[", n) |
|
|
|
|
|
stop = pdfdata.find(b"]", start) |
|
|
|
|
|
assert n != -1 and start != -1 and stop != -1 |
|
|
|
|
|
n += 1 |
|
|
|
|
|
|
|
|
|
|
|
br = [int(i, 10) for i in pdfdata[start + 1: stop].split()] |
|
|
|
|
|
contents = pdfdata[br[0] + br[1] + 1: br[2] - 1] |
|
|
|
|
|
bcontents = bytes.fromhex(contents.decode("utf8")) |
|
|
|
|
|
data1 = pdfdata[br[0]: br[0] + br[1]] |
|
|
|
|
|
data2 = pdfdata[br[2]: br[2] + br[3]] |
|
|
|
|
|
#signedData = data1 + data2 |
|
|
|
|
|
|
|
|
|
|
|
nome = 'Nome do assinante não localizado.' |
|
|
|
|
|
oname = '' |
|
|
|
|
|
try: |
|
|
|
|
|
info = cms.ContentInfo.load(bcontents) |
|
|
|
|
|
signed_data = info['content'] |
|
|
|
|
|
|
|
|
|
|
|
oun_old = [] |
|
|
|
|
|
for cert in signed_data['certificates']: |
|
|
|
|
|
subject = cert.native['tbs_certificate']['subject'] |
|
|
|
|
|
issuer = cert.native['tbs_certificate']['issuer'] |
|
|
|
|
|
oname = issuer.get('organization_name', '') |
|
|
|
|
|
|
|
|
|
|
|
if oname == 'Gov-Br': |
|
|
|
|
|
nome = subject['common_name'].split(':')[0] |
|
|
|
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
oun = subject['organizational_unit_name'] |
|
|
|
|
|
|
|
|
|
|
|
if isinstance(oun, str): |
|
|
|
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
if len(oun) > len(oun_old): |
|
|
|
|
|
oun_old = oun |
|
|
|
|
|
nome = subject['common_name'].split(':')[0] |
|
|
|
|
|
|
|
|
|
|
|
if oun and isinstance(oun, list) and len(oun) == 4: |
|
|
|
|
|
oname += ' - ' + oun[3] |
|
|
|
|
|
break |
|
|
|
|
|
|
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
fd = None |
|
|
|
|
|
signs.append((nome, [fd, oname])) |
|
|
|
|
|
|
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
return signs |
|
|
|
|
|
|
|
|
|
|
|
def signed_name_and_date_extract(file): |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
signs = run_signed_name_and_date_extract(file) |
|
|
|
|
|
except: |
|
|
|
|
|
return {} |
|
|
|
|
|
|
|
|
|
|
|
signs = sorted(signs, key=lambda sign: ( |
|
|
|
|
|
sign[0], sign[1][1], sign[1][0])) |
|
|
|
|
|
|
|
|
|
|
|
signs_dict = {} |
|
|
|
|
|
|
|
|
|
|
|
for s in signs: |
|
|
|
|
|
# or 'ICP' in s[1][1] and 'ICP' not in signs_dict[s[0]][1]: |
|
|
|
|
|
if s[0] not in signs_dict: |
|
|
|
|
|
signs_dict[s[0]] = s[1] |
|
|
|
|
|
|
|
|
|
|
|
signs = sorted(signs_dict.items(), key=lambda sign: ( |
|
|
|
|
|
sign[0], sign[1][1], sign[1][0])) |
|
|
|
|
|
|
|
|
|
|
|
sr = [] |
|
|
|
|
|
|
|
|
|
|
|
for s in signs: |
|
|
|
|
|
tt = s[0].title().split(' ') |
|
|
|
|
|
for idx, t in enumerate(tt): |
|
|
|
|
|
if t in ('Dos', 'De', 'Da', 'Do', 'Das', 'E'): |
|
|
|
|
|
tt[idx] = t.lower() |
|
|
|
|
|
sr.append((' '.join(tt), s[1])) |
|
|
|
|
|
|
|
|
|
|
|
signs = sr |
|
|
|
|
|
|
|
|
|
|
|
meta_signs = { |
|
|
|
|
|
'autores': [], |
|
|
|
|
|
'admin': [] |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
for s in signs: |
|
|
|
|
|
# cn = # settings.CERT_PRIVATE_KEY_NAME |
|
|
|
|
|
#meta_signs['admin' if s[0] == cn else 'autores'].append(s) |
|
|
|
|
|
meta_signs['autores'].append(s) |
|
|
|
|
|
return meta_signs |
|
|
|
|
|
|
|
|
|
|
|
def filefield_from_model(m): |
|
|
|
|
|
fields = m._meta.get_fields() |
|
|
|
|
|
fields = tuple(map(lambda f: f.name, filter( |
|
|
|
|
|
lambda x: isinstance(x, FileField), fields))) |
|
|
|
|
|
return fields |
|
|
|
|
|
|
|
|
|
|
|
FIELDFILE_NAME = filefield_from_model(instance) |
|
|
|
|
|
|
|
|
|
|
|
if not FIELDFILE_NAME: |
|
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
md = Metadata.objects.get( |
|
|
|
|
|
content_type=ContentType.objects.get_for_model( |
|
|
|
|
|
instance._meta.model), |
|
|
|
|
|
object_id=instance.id,).metadata |
|
|
|
|
|
except: |
|
|
|
|
|
md = {} |
|
|
|
|
|
|
|
|
|
|
|
for fn in FIELDFILE_NAME: # fn -> field_name |
|
|
|
|
|
ff = getattr(instance, fn) # ff -> file_field |
|
|
|
|
|
|
|
|
|
|
|
if md and 'signs' in md and \ |
|
|
|
|
|
fn in md['signs'] and\ |
|
|
|
|
|
md['signs'][fn]: |
|
|
|
|
|
md['signs'][fn] = {} |
|
|
|
|
|
|
|
|
|
|
|
if not ff: |
|
|
|
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
file = ff.file.file |
|
|
|
|
|
meta_signs = {} |
|
|
|
|
|
if not isinstance(ff.file, UploadedFile): |
|
|
|
|
|
absolute_path = ff.path |
|
|
|
|
|
with open(absolute_path, "rb") as file: |
|
|
|
|
|
meta_signs = signed_name_and_date_extract(file) |
|
|
|
|
|
file.close() |
|
|
|
|
|
else: |
|
|
|
|
|
file.seek(0) |
|
|
|
|
|
meta_signs = signed_name_and_date_extract(file) |
|
|
|
|
|
|
|
|
|
|
|
if not meta_signs or not meta_signs['autores'] and not meta_signs['admin']: |
|
|
|
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
if not md: |
|
|
|
|
|
md = {'signs': {}} |
|
|
|
|
|
|
|
|
|
|
|
if 'signs' not in md: |
|
|
|
|
|
md['signs'] = {} |
|
|
|
|
|
|
|
|
|
|
|
md['signs'][fn] = meta_signs |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
# print(e) |
|
|
|
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
if md: |
|
|
|
|
|
metadata = Metadata.objects.get_or_create( |
|
|
|
|
|
content_type=ContentType.objects.get_for_model( |
|
|
|
|
|
instance._meta.model), |
|
|
|
|
|
object_id=instance.id,) |
|
|
|
|
|
metadata[0].metadata = md |
|
|
|
|
|
metadata[0].save() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@receiver(pre_save, dispatch_uid='signed_files_extraction_pre_save_signal') |
|
|
|
|
|
def signed_files_extraction_pre_save_signal(sender, instance, **kwargs): |
|
|
|
|
|
|
|
|
|
|
|
signed_files_extraction_function(sender, instance, **kwargs) |
|
|