evaluation.py 15 KB
# -*- coding:utf-8 -*-

import codecs
import datetime
import itertools
import os
import random
import time

from django.core.management.base import BaseCommand
from django.db.models import Count

from normalization import normalize
from verification.duckduckgo import DuckDuckGo
from webapp.models import Expression, Meaning, Source
from settings import PROJECT_PATH

SOURCE = 'szarada'
MEANINGS_PATH = os.path.join(PROJECT_PATH, 'data', 'meanings_eval-%s-%s.csv' %
                             (SOURCE, datetime.datetime.now().strftime('%Y%m%d')))
EXPRESSIONS_PATH = os.path.join(PROJECT_PATH, 'data', 'expressions_eval-%s-%s.csv' %
                                (SOURCE, datetime.datetime.now().strftime('%Y%m%d')))
NORMALIZATION_PATH = os.path.join(PROJECT_PATH, 'data', 'normalization_eval-%s-%s.csv' %
                                  (SOURCE, datetime.datetime.now().strftime('%Y%m%d')))
AMBIG_NORMALIZATION_PATH = os.path.join(PROJECT_PATH, 'data', 'ambig_normalization_eval-%s-%s.csv' %
                                        (SOURCE, datetime.datetime.now().strftime('%Y%m%d')))
PARTICIPLE_NORMALIZATION_PATH = os.path.join(PROJECT_PATH, 'data', 'participle_normalization_eval-%s-%s.csv' %
                                             (SOURCE, datetime.datetime.now().strftime('%Y%m%d')))


MIN_HITS = [1, 2, 3, 5, 10, 15]
SAMPLE = 200


class Command(BaseCommand):
    help = 'Evaluate normalization and extraction.'

    def handle(self, *args, **options):
        # evaluate_meanings(MEANINGS_PATH)
        # evaluate_expressions(EXPRESSIONS_PATH)

        evaluate_normalization(AMBIG_NORMALIZATION_PATH, True)
        # evaluate_normalization(NORMALIZATION_PATH, False)

        # evaluate_participle_expressions(PARTICIPLE_NORMALIZATION_PATH)


def evaluate_meanings(eval_path):
    csv_file = codecs.open(eval_path, 'wt', 'utf-8')

    for source in Source.objects.filter(key=SOURCE):
        print (source)
        csv_file.write(u'\n%s\n' % source.name)
        csv_file.write(u'ID\tHasło\tDefinicja')
        for min_hits in MIN_HITS:
            csv_file.write('\t%d' % min_hits)
        csv_file.write(u'\tCzłowiek\n')
        meanings = Meaning.objects.filter(expressions__link__source=source)
        random_pks_list = []
        evaluation = {}
        while len(random_pks_list) < SAMPLE and meanings.count() > 0:
            random_meaning = random.choice(meanings)
            if (random_meaning.expressions.filter(is_catchword=True).exists() and
                    random_meaning.expressions.exclude(is_catchword=True).exists()):
                meanings = meanings.exclude(pk=random_meaning.pk)
                random_pks_list.append(random_meaning.pk)
                random_catchword = random.choice(random_meaning.expressions.filter(is_catchword=True))
                random_definition = random.choice(random_meaning.expressions.exclude(is_catchword=True))
                evaluation[random_meaning.pk] = {'random_catchword': random_catchword,
                                                 'random_definition': random_definition}
                for min_hits in MIN_HITS:
                    evaluation[random_meaning.pk][min_hits] = -1

        for min_hits in MIN_HITS:
            random_meanings = Meaning.objects.filter(pk__in=random_pks_list)
            while random_meanings:
                random_meanings = check_meanings(evaluation, min_hits, random_meanings)

        write_meanings_evaluation(evaluation, csv_file)
    csv_file.close()


def check_meanings(evaluation, min_hits, meanings):
    check_again_meanings = []
    duckduckgo = DuckDuckGo(min_hits)

    for meaning in meanings:
        random_catchword = evaluation[meaning.pk]['random_catchword']
        random_definition = evaluation[meaning.pk]['random_definition']

        time.sleep(random.uniform(2.0, 4.0))
        try:
            if is_already_false(evaluation[meaning.pk]) or not duckduckgo.check_entry(random_catchword,
                                                                                      random_definition):
                print ('Erase:\t',  random_catchword.orth_text, u'\t-->\t', random_definition.orth_text)
                evaluation[meaning.pk]['definition'] = u'%s\t%s' % (random_catchword.orth_text,
                                                                    random_definition.orth_text)
                evaluation[meaning.pk][min_hits] = 0
            else:
                print ('OK:\t',  random_catchword.orth_text, u'\t-->\t', random_definition.orth_text)
                evaluation[meaning.pk]['definition'] = u'%s\t%s' % (random_catchword.orth_text,
                                                                    random_definition.orth_text)
                evaluation[meaning.pk][min_hits] = 1
        except RuntimeError:
            print ('Try again:\t', random_catchword.orth_text, u'\t-->\t', random_definition.orth_text)
            check_again_meanings.append(meaning)

    return check_again_meanings


def is_already_false(evaluation):
    for hits in evaluation:
        if evaluation[hits] == 0:
            return True
    return False


def write_meanings_evaluation(evaluation, csv_file):
    for mng_pk in evaluation:
        mng_eval = evaluation[mng_pk]
        csv_file.write(u'%d\t%s' % (mng_pk, mng_eval['definition']))

        for min_hits in MIN_HITS:
            csv_file.write('\t%d' % mng_eval[min_hits])
        csv_file.write(u'\n')


def evaluate_expressions(eval_path):
    csv_file = codecs.open(eval_path, 'wt', 'utf-8')

    for source in Source.objects.filter(key=SOURCE):
        print (source)
        csv_file.write(u'\n%s\n' % source.name)
        csv_file.write(u'ID\tWyrażenie')
        for min_hits in MIN_HITS:
            csv_file.write('\t%d' % min_hits)
        csv_file.write(u'\tCzłowiek\n')

        expressions = Expression.objects.filter(link__source=source)
        expressions = expressions.annotate(num_segments=Count('segments'))
        expressions = expressions.filter(num_segments__gt=1)

        random_pks_list = []
        evaluation = {}
        while len(random_pks_list) < SAMPLE and expressions.count() > 0:
            random_expression = random.choice(expressions)
            print (random_expression)
            expressions = expressions.exclude(pk=random_expression.pk)
            random_pks_list.append(random_expression.pk)
            evaluation[random_expression.pk] = {}
            for min_hits in MIN_HITS:
                evaluation[random_expression.pk][min_hits] = -1

        for min_hits in MIN_HITS:
            random_expressions = Expression.objects.filter(pk__in=random_pks_list)
            while random_expressions:
                random_expressions = check_expressions(evaluation, min_hits, random_expressions)

        write_expressions_evaluation(evaluation, csv_file)
    csv_file.close()


def check_expressions(evaluation, min_hits, random_expressions):
    check_again_expressions = []
    duckduckgo = DuckDuckGo(min_hits)

    for expression in random_expressions:
        time.sleep(random.uniform(2.0, 4.0))
        try:
            if is_already_false(evaluation[expression.pk]) or not duckduckgo.check_expression(expression):
                print ('Erase:\t', expression.orth_text)
                evaluation[expression.pk]['expression'] = u'%s' % expression.orth_text
                evaluation[expression.pk][min_hits] = 0
            else:
                print ('OK:\t',  expression.orth_text)
                evaluation[expression.pk]['expression'] = u'%s' % expression.orth_text
                evaluation[expression.pk][min_hits] = 1
        except RuntimeError:
            print ('Try again:\t', expression.orth_text)
            check_again_expressions.append(expression)

    return check_again_expressions


def write_expressions_evaluation(evaluation, csv_file):
    for expr_pk in evaluation:
        expr_eval = evaluation[expr_pk]
        csv_file.write(u'%d\t%s' % (expr_pk, expr_eval['expression']))

        for min_hits in MIN_HITS:
            csv_file.write('\t%d' % expr_eval[min_hits])
        csv_file.write(u'\n')


def evaluate_normalization(eval_path, ambiguity):
    csv_file = codecs.open(eval_path, 'wt', 'utf-8')

    for source in Source.objects.filter(key=SOURCE):
        print (source)
        csv_file.write(u'\n%s\n' % source.name)
        csv_file.write(u'ID\tWyrażenie bazowe\tNormalizacja')
        for min_hits in MIN_HITS:
            csv_file.write('\t%d' % min_hits)
        csv_file.write(u'\tCzłowiek\n')

        expressions = Expression.objects.filter(link__source=source)
        expressions = expressions.annotate(num_segments=Count('segments'))
        expressions = expressions.filter(num_segments__gt=1)

        random_pks_list = []
        evaluation = {}
        while len(random_pks_list) < SAMPLE and expressions.count() > 0:
            random_expression = random.choice(expressions)
            normalized_expressions = get_normalized_expression(random_expression, ambiguity)
            expressions = expressions.exclude(pk=random_expression.pk)
            if len(normalized_expressions) > 0:
                print (random_expression)
                random_pks_list.append(random_expression.pk)
                evaluation[random_expression.pk] = {'random_normalization': random.choice(normalized_expressions)}
                for min_hits in MIN_HITS:
                    evaluation[random_expression.pk][min_hits] = -1

        for min_hits in MIN_HITS:
            random_expressions = Expression.objects.filter(pk__in=random_pks_list)
            while random_expressions:
                random_expressions = check_normalizations(evaluation, min_hits, random_expressions)

        write_normalizations_evaluation(evaluation, csv_file)
    csv_file.close()


def check_normalizations(evaluation, min_hits, random_expressions):
    check_again_expressions = []
    duckduckgo = DuckDuckGo(min_hits)

    for expression in random_expressions:
        time.sleep(random.uniform(2.0, 4.0))
        normalized_expr = evaluation[expression.pk]['random_normalization']
        normalized_orth = normalize.get_normalized_expr_text(expression, normalized_expr)
        try:
            if is_already_false(evaluation[expression.pk]) or not duckduckgo.check_expression(expression,
                                                                                              normalized_expr):
                print ('Erase:\t', expression.orth_text, '!!', normalized_orth)
                evaluation[expression.pk]['expression'] = u'%s\t%s' % (expression.orth_text, normalized_orth)
                evaluation[expression.pk][min_hits] = 0
            else:
                print ('OK:\t',  expression.orth_text, '!!', normalized_orth)
                evaluation[expression.pk]['expression'] = u'%s\t%s' % (expression.orth_text, normalized_orth)
                evaluation[expression.pk][min_hits] = 1
        except RuntimeError:
            print ('Try again:\t', expression.orth_text, '!!', normalized_orth)
            check_again_expressions.append(expression)

    return check_again_expressions


def get_normalized_expression(expression, ambiguity):
    normalized_expressions = []
    no_spaced_orig_expr = expression.orth_text.replace(' ', '')
    if expression.segments.count() > 1:
        possible_forms = generate_forms(expression, ambiguity)
        if len(possible_forms) > 1:
            for form in possible_forms:
                normalized_expression = normalize.get_normalized_expr_text(expression, form)
                no_spaced_normalized_expr = normalized_expression.replace(' ', '')
                if no_spaced_normalized_expr == no_spaced_orig_expr:
                    continue

                if form not in normalized_expressions:
                    normalized_expressions.append(form)

    return normalized_expressions


def generate_forms(expression, ambiguity):
    segments = []
    expr_segments = expression.segments.order_by('position_in_expr')
    for seg in expr_segments:
        seg_lus = normalize.get_lus(seg.base)
        if seg.is_head and (seg_lus.count() == 1 or ambiguity):
            # head_equivalents = normalize.get_head_equivalents(seg)
            head_equivalents = normalize.get_synonymic_equivalents(seg)
            segments.append(head_equivalents)
        elif seg_lus.count() == 1 or ambiguity:
            equivalents = normalize.get_synonymic_equivalents(seg)
            segments.append(equivalents)
        else:
            segments.append([seg.orth])
    generated_expressions = itertools.product(*segments)

    generated_expressions = list(generated_expressions)
    pariciple_expressions = normalize.create_pariciple_expressions(generated_expressions, expression)
    generated_expressions.extend(pariciple_expressions)

    return generated_expressions


def write_normalizations_evaluation(evaluation, csv_file):
    for expr_pk in evaluation:
        expr_eval = evaluation[expr_pk]
        csv_file.write(u'%d\t%s' % (expr_pk, expr_eval['expression']))

        for min_hits in MIN_HITS:
            csv_file.write('\t%d' % expr_eval[min_hits])
        csv_file.write(u'\n')


def evaluate_participle_expressions(eval_path):
    csv_file = codecs.open(eval_path, 'wt', 'utf-8')

    for source in Source.objects.filter(key=SOURCE):
        print (source)
        csv_file.write(u'\n%s\n' % source.name)
        csv_file.write(u'ID\tWyrażenie bazowe\tNormalizacja')
        for min_hits in MIN_HITS:
            csv_file.write('\t%d' % min_hits)
        csv_file.write(u'\tCzłowiek\n')

        expressions = Expression.objects.filter(link__source=source)
        expressions = expressions.annotate(num_segments=Count('segments'))
        expressions = expressions.filter(num_segments__gt=1)
        expressions = expressions.filter(segments__base=u'który')

        random_pks_list = []
        evaluation = {}
        while len(random_pks_list) < SAMPLE and expressions.count() > 0:
            random_expression = random.choice(expressions)
            expression_segments = [seg.orth for seg in random_expression.segments.order_by('position_in_expr')]
            normalized_expressions = normalize.create_pariciple_expressions([expression_segments], random_expression)

            expressions = expressions.exclude(pk=random_expression.pk)
            if len(normalized_expressions) > 0:
                print (random_expression)
                random_pks_list.append(random_expression.pk)
                evaluation[random_expression.pk] = {'random_normalization': random.choice(normalized_expressions)}
                if not MIN_HITS:
                    normalized_expr = evaluation[random_expression.pk]['random_normalization']
                    normalized_orth = normalize.get_normalized_expr_text(random_expression, normalized_expr)
                    evaluation[random_expression.pk]['expression'] = u'%s\t%s' % (random_expression.orth_text, normalized_orth)

        for min_hits in MIN_HITS:
            random_expressions = Expression.objects.filter(pk__in=random_pks_list)
            while random_expressions:
                random_expressions = check_normalizations(evaluation, min_hits, random_expressions)

        write_normalizations_evaluation(evaluation, csv_file)
    csv_file.close()