models.py 9.3 KB
import os

from django.contrib.postgres.fields import ArrayField, JSONField
from django.contrib.auth import get_user_model
from django.db import models
from django.db.models import UniqueConstraint
from partial_date import PartialDateField
from languages.fields import LanguageField

from pipeline.models import Pipeline, ProcessingStatus

User = get_user_model()


class Document(models.Model):
    name = models.TextField()
    source_id = models.CharField(max_length=32, blank=True)
    lang = models.CharField(max_length=5)
    original_lang = LanguageField()
    pipeline = models.ForeignKey(Pipeline, related_name='documents', on_delete=models.CASCADE)
    image = models.BooleanField(default=False)
    broken_source = models.BooleanField(default=False)
    in_effect = models.BooleanField(default=False)
    indexed = models.BooleanField(default=False)
    changed = models.BooleanField(default=False)
    title = models.TextField(blank=True)
    publication_date = PartialDateField(null=True, blank=True)
    publication_place = models.CharField(max_length=100, blank=True)
    creation_time = models.DateTimeField(auto_now_add=True)
    keywords = models.ManyToManyField('Keyword', related_name='documents', blank=True)
    meta_url = models.URLField(blank=True)
    source_url = models.URLField(blank=True)
    file_url = models.URLField(blank=True)
    path = models.TextField()
    channel = models.CharField(max_length=50)
    type = models.CharField(max_length=50)
    text_origin = models.CharField(max_length=50)
    status = models.CharField(max_length=100)
    processing_status = models.ForeignKey(ProcessingStatus, related_name='documents', on_delete=models.CASCADE)
    new = models.BooleanField(default=True)
    unk_coverage = models.FloatField(default=0.0)
    parent = models.ForeignKey("self", null=True, default=None, blank=True, on_delete=models.CASCADE)
    sequence = models.PositiveIntegerField(default=0)

    def delete(self, *args, **kwargs):
        super().delete(*args, **kwargs)
        if self.parent is not None:
            bigger_subdocs = Document.objects.filter(parent=self.parent,
                                                     sequence__gt=self.sequence).order_by('sequence')
            for subdoc in bigger_subdocs:
                subdoc.sequence -= 1
                subdoc.save()

    def annotated(self):
        for chunk in self.chunks.all():
            if chunk.anno:
                return True
            for utt in chunk.utterances.all():
                if utt.anno:
                    return True
        return False

    def clean_directory(self):
        for filename in os.listdir(self.path):
            if filename.startswith('tmp'):
                tmp_file = os.path.join(self.path, filename)
                os.remove(tmp_file)

    def clear_annotation(self):
        for chunk in self.chunks.all():
            chunk.anno = None
            chunk.save()
            for utt in chunk.utterances.all():
                utt.anno = None
                utt.save()

    def header_path(self):
        for filename in os.listdir(self.path):
            if filename.startswith('header'):
                return os.path.join(self.path, filename)
        return None

    def meta_path(self):
        for filename in os.listdir(self.path):
            if filename.startswith('meta'):
                return os.path.join(self.path, filename)
        return None

    def remove_source_files(self):
        for filename in os.listdir(self.path):
            if filename.startswith('source'):
                src_file = os.path.join(self.path, filename)
                os.remove(src_file)

    def remove_text_files(self):
        for filename in os.listdir(self.path):
            if filename.startswith('text'):
                tmp_file = os.path.join(self.path, filename)
                os.remove(tmp_file)

    def source_path(self):
        for filename in os.listdir(self.path):
            if filename.startswith('source'):
                return os.path.join(self.path, filename)
        return None

    def text_path(self):
        for filename in os.listdir(self.path):
            if filename.startswith('text') and 'original' not in filename.split('.'):
                return os.path.join(self.path, filename)
        return None

    def change_processing_status(self, status_key):
        status = ProcessingStatus.objects.get(key=status_key)
        self.processing_status = status
        subdocuments = Document.objects.filter(parent=self)
        if subdocuments is not None:
            for subdoc in subdocuments:
                subdoc.processing_status = status
                subdoc.save()
        self.save()

    def original_lang_error(self):
        if self.original_lang == 'pl' and len(self.participants.filter(role='translator')) > 0:
            return True
        return False

    def missing_translator(self):
        if self.original_lang != 'pl' and len(self.participants.filter(role='translator')) == 0:
            return True
        return False

    def check_details_filling(self):
        if self.parent is None:
            if self.title == '' or self.publication_date is None or self.publication_place == '' or \
                    self.original_lang == '':
                return False
        else:
            if self.title == '' or self.original_lang == '':
                return False
        return True

    @staticmethod
    def get_doc_type_display():
        return 'dokument'

    class Meta:
        db_table = 'document'
        ordering = ['id']
        constraints = [
            models.UniqueConstraint(fields=['name', 'sequence'], name='unique subdocument sequence')
        ]

    def __str__(self):
        return f'{self.name}-{self.id}'


class Magazine(Document):
    number = models.CharField(max_length=50, null=True)

    def check_details_filling(self):
        if self.title == '' or self.publication_date is None or self.publication_place == '' or self.number == '' \
                or self.original_lang == '':
            return False
        return True

    @staticmethod
    def get_doc_type_display():
        return 'czasopismo'


class BookWithMultipleAuthors(Document):
    publisher = models.CharField(max_length=100, null=True)

    def check_details_filling(self):
        if self.title == '' or self.publication_date is None or self.publication_place == '' or self.publisher == '' \
                or self.original_lang == '':
            return False
        return True

    @staticmethod
    def get_doc_type_display():
        return 'książka o wielu autorach'


class Article(Document):
    @staticmethod
    def get_doc_type_display():
        return 'artykuł'


class Chapter(Document):
    @staticmethod
    def get_doc_type_display():
        return 'rozdział'


class Chunk(models.Model):
    document = models.ForeignKey(Document, related_name='chunks', on_delete=models.CASCADE)
    sequence = models.PositiveIntegerField()
    original_text = models.TextField(blank=True)
    text = models.TextField(blank=True)
    anno = JSONField(blank=True, null=True)

    class Meta:
        db_table = 'chunk'
        ordering = ['sequence']

    def __str__(self):
        if self.utterances.exists():
            return '\n\n'.join([str(utt) for utt in self.utterances.order_by('sequence')])
        return self.text


class Keyword(models.Model):
    label = models.CharField(max_length=100, unique=True)
    vector = ArrayField(models.FloatField(), null=True)

    class Meta:
        db_table = 'keyword'
        ordering = ['label']

    def __str__(self):
        return self.label


class Metadata(models.Model):
    document = models.ForeignKey(Document, related_name='metadata', on_delete=models.CASCADE)
    name = models.CharField(max_length=24)
    target = models.TextField(blank=True)
    sequence = models.PositiveIntegerField(null=True, blank=True)
    value = models.TextField()

    class Meta:
        db_table = 'metadata'
        ordering = ['sequence']

    def __str__(self):
        return '%s:\t%s' % (self.name, self.value)


class Participant(models.Model):
    abbrev = models.TextField()
    document = models.ForeignKey(Document, related_name='participants', on_delete=models.CASCADE)
    name = models.TextField()
    order = models.PositiveIntegerField()
    role = models.CharField(max_length=20)
    type = models.CharField(max_length=20)
    GENDER_CHOICES = (
        ('M', 'mężczyzna'),
        ('F', 'kobieta'),
        ('U', 'nieznana')
    )
    gender = models.CharField(max_length=1, choices=GENDER_CHOICES)

    def get_role_display(self):
        if self.role == 'author':
            return 'autor'
        elif self.role == 'translator':
            return 'tłumacz'
        return None

    class Meta:
        db_table = 'participant'
        ordering = ['-type', 'order']

    def __str__(self):
        return '%s\t%s' % (self.name, self.role)


class Annotation(models.Model):
    document = models.ForeignKey(Document, related_name='annotations', on_delete=models.CASCADE)
    user = models.ForeignKey(User, related_name='annotations', on_delete=models.CASCADE)
    start_time = models.DateTimeField(auto_now_add=True)
    finish_time = models.DateTimeField(blank=True, null=True)
    finished = models.BooleanField(default=False)

    def __str__(self):
        return f'{self.user.username}-{self.document.name}-{str(self.document.pk)}'

    class Meta:
        ordering = ['finish_time']