data_utils.py 6.05 KB
from collections import defaultdict
from itertools import chain

import numpy as np
import tensorflow as tf

from datasets.features import ClassLabel

from .constants import MASK_VALUE, FIRST, LAST, SEG_BEGIN, SEG_INSIDE, TOKENS, SEG, LEMMAS

# https://huggingface.co/docs/transformers/v4.23.1/en/tasks/token_classification

def masked_word_ids(word_ids, masking_strategy=FIRST):
    masked = []
    for i, word_idx in enumerate(word_ids):
        # Set the label for the first/last token of each word.
        # Mask the label for:
        #   * special tokens (word id = None)
        #   * other tokens in a word
        if word_idx is None:
            masked.append(None)
        else:
            if masking_strategy == FIRST:
                masked.append(word_idx if word_idx != word_ids[i - 1] else None)
            elif masking_strategy == LAST:
                masked.append(word_idx if word_idx != word_ids[i + 1] else None)
    return masked

def _align_row(values, masked_word_ids):
    return [MASK_VALUE if idx is None else values[idx] for idx in masked_word_ids]

def _align_example(example, masked_ids):
    
    column_names = list(example.keys())
    labels = defaultdict(list)
    masked_row = [MASK_VALUE for x in masked_ids]
    
    for column_name in column_names:
        if column_name in ('id', TOKENS, LEMMAS):
            continue
        values = example[column_name]
        matrix = hasattr(values[0], '__iter__')
        if matrix:
            aligned_labels = [_align_row(values[idx], masked_ids) if idx is not None else masked_row for idx in masked_ids]
        else:
            aligned_labels = _align_row(example[column_name], masked_ids)
        labels[column_name] = aligned_labels
    
    return labels
    

def bert_tokenize_and_align(example, tokenizer, masking_strategy=FIRST):
    
    if masking_strategy not in (FIRST, LAST):
        raise RuntimeError(f'Uknown masking strategy: {masking_strategy}')
    
    tokenized_inputs = tokenizer(example[TOKENS], truncation=True, is_split_into_words=True)
    word_ids = tokenized_inputs.word_ids()
    mask = masked_word_ids(word_ids, masking_strategy)
    labels = _align_example(example, mask)
    tokenized_inputs.update(labels)
    return tokenized_inputs

def morf_tokenize(text, m):
    segs = dict()
    max_j = 0
    for i, j, interp in m.analyse(text):
        orth = interp[0]
        if (i, j) in segs:
            assert (orth == segs[(i, j)])
        else:
            segs[(i, j)] = orth
        max_j = max(max_j, j)
    return [segs[(i, i + 1)] for i in range(max_j)]

def morf_tokenize_and_align(example, morfeusz, masking_strategy=FIRST):
    
    if masking_strategy not in (FIRST, LAST):
        raise RuntimeError(f'Uknown masking strategy: {masking_strategy}')
    if masking_strategy == LAST:
        raise RuntimeError(f'Can’t use {masking_strategy} masking strategy with retokenize')
    
    labels = defaultdict(list)
    
    mask = []
    for i, token in enumerate(example[TOKENS]):
        for j, morf_token in enumerate(morf_tokenize(token, morfeusz)):
            labels[TOKENS].append(morf_token)
            labels[SEG].append(SEG_BEGIN if j == 0 else SEG_INSIDE)
            mask.append(i if j == 0 else None)
    
    labels.update(_align_example(example, mask))
    return labels

FLOAT = {float, np.float64}

def _dtype(v):
    return tf.float64 if FLOAT.intersection(map(type, v[0])) else tf.int64

# based on tensorflow.data.data_collator.DataCollatorForTokenClassification
class DataCollator(object):
    
    def __init__(self, tokenizer, categories, categories2d):
        self.tokenizer = tokenizer
        self.categories = categories
        self.categories2d = categories2d
    
    def _pad_labels(self, labels, sequence_length):
        if self.tokenizer.padding_side == "right":
            return list(labels) + [MASK_VALUE] * (sequence_length - len(labels))
        else:
            return [MASK_VALUE] * (sequence_length - len(labels)) + list(labels)

    def __call__(self, features):

        batch = self.tokenizer.pad(
            features,
            padding=True,
        )
        sequence_length = tf.convert_to_tensor(batch['input_ids']).shape[1]
        for category in self.categories:
            padded_labels = [self._pad_labels(lbl, sequence_length) for lbl in batch[category]]
            batch[category] = padded_labels
        for category in self.categories2d:
            # pad the matrix rows to sequence_length
            # and add empty rows to pad to obtain a sequence_length x sequence_length matrix
            padded_labels = [
                [
                    self._pad_labels(row, sequence_length) for row in lbl
                ] + [
                    self._pad_labels([], sequence_length) for _ in range(sequence_length - len(lbl))
                ]
                for lbl in batch[category]
            ]
            batch[category] = padded_labels
        for k, v in batch.items():
            try:
                x = tf.convert_to_tensor(v, dtype=_dtype(v))
            except:
                print(k)
                print(v)
                print(map(type, v[0]))
                print(list(map(type, v[0])))
                print(_dtype(v))
                raise
        batch = {k: tf.convert_to_tensor(v, dtype=_dtype(v)) for k, v in batch.items()}
        return batch

class LabelDict(object):
    
    def __init__(self, feature_names, dataset_features):
        unique_tags = []
        for c in feature_names:
            feature = dataset_features[c].feature
            if type(feature) == ClassLabel:
                unique_tags.append(feature.names)
            else:
                unique_tags.append(['VALUE'])
        assert(len(unique_tags) == len(feature_names))
        self.feature_names = feature_names
        self.n = len(feature_names)
        self.tag2id = [
            {tag: idx for idx, tag in enumerate(tags)}
            for tags in unique_tags
        ]
        self.id2tag = [
            {idx: tag for tag, idx in t2idx.items()}
            for t2idx in self.tag2id
        ]

def dict_to_tensors(d):
    return { k : tf.convert_to_tensor(v) for k, v in d.items() }