models.py 10.3 KB
import importlib
import os

from django.contrib.postgres.fields import ArrayField, JSONField
from django.contrib.auth import get_user_model
from django.db import models

from pipeline.models import Pipeline

User = get_user_model()


class Document(models.Model):
    name = models.CharField(max_length=100, unique=True)
    source_id = models.CharField(max_length=32)
    lang = models.CharField(max_length=7)
    pipeline = models.ForeignKey(Pipeline, related_name='documents', on_delete=models.CASCADE)
    image = models.BooleanField(default=False)
    broken_source = models.BooleanField(default=False)
    in_effect = models.BooleanField(default=False)
    indexed = models.BooleanField(default=False)
    changed = models.BooleanField(default=False)
    new = models.BooleanField(default=True)
    selected = models.BooleanField(default=False)
    publication_date = models.DateField(blank=True, null=True)
    publisher = models.CharField(max_length=200)
    issue = models.ForeignKey('curlicat.Issue', related_name='documents', on_delete=models.CASCADE,
                              blank=True, null=True)
    creation_time = models.DateTimeField(auto_now_add=True)
    keywords = models.ManyToManyField('Keyword', related_name='documents', blank=True)
    scientific_disciplines = models.ManyToManyField('curlicat.ScientificDiscipline', related_name='documents',
                                                    blank=True)
    meta_url = models.URLField(blank=True)
    source_url = models.URLField(blank=True)
    file_url = models.URLField(blank=True)
    path = models.TextField()
    title = models.TextField()
    type = models.CharField(max_length=50)
    status = models.CharField(max_length=100)
    unk_coverage = models.FloatField(default=0.0)
    parent = models.ForeignKey("self", null=True, default=None, on_delete=models.CASCADE)
    sequence = models.PositiveIntegerField(default=0)

    def annotated(self):
        for chunk in self.chunks.all():
            if chunk.anno:
                return True
            for utt in chunk.utterances.all():
                if utt.anno:
                    return True
        for meta in self.metadata.all():
            if meta.anno:
                return True
        return False

    def chunks_annotated(self):
        for chunk in self.chunks.all():
            if chunk.anno:
                return True
            for utt in chunk.utterances.all():
                if utt.anno:
                    return True
        return False

    def clean_directory(self):
        for filename in os.listdir(self.path):
            if filename.startswith('tmp'):
                tmp_file = os.path.join(self.path, filename)
                os.remove(tmp_file)

    def clear_annotation(self):
        for chunk in self.chunks.all():
            chunk.anno = None
            chunk.save()
            for utt in chunk.utterances.all():
                utt.anno = None
                utt.save()

    def get_authors(self):
        return importlib.import_module('projects.%s.mappings' % self.pipeline.project.name).get_authors(self)

    def get_reviewers(self):
        return importlib.import_module('projects.%s.mappings' % self.pipeline.project.name).get_reviewers(self)

    def get_translators(self):
        return importlib.import_module('projects.%s.mappings' % self.pipeline.project.name).get_translators(self)

    def get_abstract_anno(self):
        return importlib.import_module('projects.%s.mappings' % self.pipeline.project.name).get_abstract_anno(self)

    def get_en_abstract(self):
        return importlib.import_module('projects.%s.mappings' % self.pipeline.project.name).get_en_abstract(self)

    def get_date(self):
        return importlib.import_module('projects.%s.mappings' % self.pipeline.project.name).get_date(self)

    def get_keywords(self):
        return importlib.import_module('projects.%s.mappings' % self.pipeline.project.name).get_keywords(self)

    def get_en_keywords(self):
        return importlib.import_module('projects.%s.mappings' % self.pipeline.project.name).get_en_keywords(self)

    def get_lang(self):
        return importlib.import_module('projects.%s.mappings' % self.pipeline.project.name).get_lang(self)

    def get_source_url(self):
        return importlib.import_module('projects.%s.mappings' % self.pipeline.project.name).get_source_url(self)

    def get_status(self):
        return importlib.import_module('projects.%s.mappings' % self.pipeline.project.name).get_status(self)

    def get_title_anno(self):
        return importlib.import_module('projects.%s.mappings' % self.pipeline.project.name).get_title_anno(self)

    def get_title(self):
        return importlib.import_module('projects.%s.mappings' % self.pipeline.project.name).get_title(self)

    def get_en_title(self):
        return importlib.import_module('projects.%s.mappings' % self.pipeline.project.name).get_en_title(self)

    def header_path(self):
        for filename in os.listdir(self.path):
            if filename.startswith('header'):
                return os.path.join(self.path, filename)
        return None

    def meta_path(self):
        for filename in os.listdir(self.path):
            if filename.startswith('meta'):
                return os.path.join(self.path, filename)
        return None

    def remove_source_files(self):
        for filename in os.listdir(self.path):
            if filename.startswith('source'):
                src_file = os.path.join(self.path, filename)
                os.remove(src_file)

    def remove_text_files(self):
        for filename in os.listdir(self.path):
            if filename.startswith('text'):
                tmp_file = os.path.join(self.path, filename)
                os.remove(tmp_file)

    def scientific_fields(self):
        sfs = []
        for sd in self.scientific_disciplines.all():
            if sd.field not in sfs:
                sfs.append(sd.field)
        return sfs

    def source_path(self):
        for filename in os.listdir(self.path):
            if filename.startswith('source'):
                return os.path.join(self.path, filename)
        return None

    def text_path(self):
        for filename in os.listdir(self.path):
            if filename.startswith('text') and 'original' not in filename.split('.'):
                return os.path.join(self.path, filename)
        return None

    def words_count(self):
        words_count = 0
        for chunk in self.chunks.all():
            words_count += chunk.words_count()
        return words_count

    def pl_chunks_longer_than_min(self):
        min_segments = importlib.import_module('projects.%s.mappings' %
                                               self.pipeline.project.name).MIN_SEGMENTS_BY_CHUNK
        longer = []
        for chunk in self.chunks.filter(lang='pl'):
            if chunk.anno and chunk.segments_count() > min_segments:
                longer.append(chunk)
        return longer

    class Meta:
        db_table = 'document'
        ordering = ['id']

    def __str__(self):
        return self.name


class Chunk(models.Model):
    document = models.ForeignKey(Document, related_name='chunks', on_delete=models.CASCADE)
    lang = models.CharField(max_length=7, default='pl')
    sequence = models.PositiveIntegerField()
    original_text = models.TextField(blank=True)
    text = models.TextField(blank=True)
    anno = JSONField(blank=True, null=True)

    def words_count(self):
        words_count = len(self.text.split())
        for utt in self.utterances.all():
            words_count += utt.words_count()
        return words_count

    def segments_count(self):
        segments_count = 0
        for chunk in self.anno['chunks']:
            for sent in chunk['sentences']:
                segments_count += len(sent['tokens'])
        for utt in self.utterances.all():
            segments_count += utt.segments_count()
        return segments_count

    class Meta:
        db_table = 'chunk'
        ordering = ['sequence']

    def __str__(self):
        if self.utterances.exists():
            return '\n\n'.join([str(utt) for utt in self.utterances.order_by('sequence')])
        return self.text


class Keyword(models.Model):
    label = models.TextField()
    lang = models.CharField(max_length=7, default='pl')
    vector = ArrayField(models.FloatField(), null=True)

    class Meta:
        db_table = 'keyword'
        ordering = ['label']
        unique_together = ('label', 'lang',)

    def __str__(self):
        return f'{self.label} [{self.lang}]'


class Metadata(models.Model):
    document = models.ForeignKey(Document, related_name='metadata', on_delete=models.CASCADE)
    anno = JSONField(blank=True, null=True)
    lang = models.CharField(max_length=7, default='pl')
    name = models.CharField(max_length=24)
    target = models.TextField(blank=True)
    sequence = models.PositiveIntegerField(null=True, blank=True)
    value = models.TextField()

    class Meta:
        db_table = 'metadata'
        ordering = ['sequence']

    def __str__(self):
        if self.lang:
            return '%s:\t%s [%s]' % (self.name, self.value, self.lang)
        else:
            return '%s:\t%s' % (self.name, self.value)


class Participant(models.Model):
    abbrev = models.TextField()
    biography = models.TextField(blank=True)
    document = models.ForeignKey(Document, related_name='participants', on_delete=models.CASCADE)
    email = models.EmailField(max_length=254, blank=True)
    name = models.TextField()
    first_name = models.TextField(blank=True)
    last_name = models.TextField(blank=True)
    order = models.PositiveIntegerField()
    role = models.CharField(max_length=20)
    type = models.CharField(max_length=20)

    class Meta:
        db_table = 'participant'
        ordering = ['-type', 'order']

    def __str__(self):
        return '%s\t%s\t%s' % (self.name, self.role,' | '.join([pi.publication_institution_name
                                                                for pi in self.publication_institutions.all()]))


class Annotation(models.Model):
    document = models.ForeignKey(Document, related_name='annotations', on_delete=models.CASCADE)
    user = models.ForeignKey(User, related_name='annotations', on_delete=models.CASCADE)
    start_time = models.DateTimeField(auto_now_add=True)
    finish_time = models.DateTimeField(blank=True, null=True)
    finished = models.BooleanField(default=False)

    def __str__(self):
        return f'{self.user.username}-{str(self.document.pk)}'