data_utils.py
6.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
from collections import defaultdict
from itertools import chain
import numpy as np
import tensorflow as tf
from datasets.features import ClassLabel
from .constants import MASK_VALUE, FIRST, LAST, SEG_BEGIN, SEG_INSIDE, TOKENS, SEG, LEMMAS
# https://huggingface.co/docs/transformers/v4.23.1/en/tasks/token_classification
def masked_word_ids(word_ids, masking_strategy=FIRST):
masked = []
for i, word_idx in enumerate(word_ids):
# Set the label for the first/last token of each word.
# Mask the label for:
# * special tokens (word id = None)
# * other tokens in a word
if word_idx is None:
masked.append(None)
else:
if masking_strategy == FIRST:
masked.append(word_idx if word_idx != word_ids[i - 1] else None)
elif masking_strategy == LAST:
masked.append(word_idx if word_idx != word_ids[i + 1] else None)
return masked
def _align_row(values, masked_word_ids):
return [MASK_VALUE if idx is None else values[idx] for idx in masked_word_ids]
def _align_example(example, masked_ids):
column_names = list(example.keys())
labels = defaultdict(list)
masked_row = [MASK_VALUE for x in masked_ids]
for column_name in column_names:
if column_name in ('id', TOKENS, LEMMAS):
continue
values = example[column_name]
matrix = hasattr(values[0], '__iter__')
if matrix:
aligned_labels = [_align_row(values[idx], masked_ids) if idx is not None else masked_row for idx in masked_ids]
else:
aligned_labels = _align_row(example[column_name], masked_ids)
labels[column_name] = aligned_labels
return labels
def bert_tokenize_and_align(example, tokenizer, masking_strategy=FIRST):
if masking_strategy not in (FIRST, LAST):
raise RuntimeError(f'Uknown masking strategy: {masking_strategy}')
tokenized_inputs = tokenizer(example[TOKENS], truncation=True, is_split_into_words=True)
word_ids = tokenized_inputs.word_ids()
mask = masked_word_ids(word_ids, masking_strategy)
labels = _align_example(example, mask)
tokenized_inputs.update(labels)
return tokenized_inputs
def morf_tokenize(text, m):
segs = dict()
max_j = 0
for i, j, interp in m.analyse(text):
orth = interp[0]
if (i, j) in segs:
assert (orth == segs[(i, j)])
else:
segs[(i, j)] = orth
max_j = max(max_j, j)
return [segs[(i, i + 1)] for i in range(max_j)]
def morf_tokenize_and_align(example, morfeusz, masking_strategy=FIRST):
if masking_strategy not in (FIRST, LAST):
raise RuntimeError(f'Uknown masking strategy: {masking_strategy}')
if masking_strategy == LAST:
raise RuntimeError(f'Can’t use {masking_strategy} masking strategy with retokenize')
labels = defaultdict(list)
mask = []
for i, token in enumerate(example[TOKENS]):
for j, morf_token in enumerate(morf_tokenize(token, morfeusz)):
labels[TOKENS].append(morf_token)
labels[SEG].append(SEG_BEGIN if j == 0 else SEG_INSIDE)
mask.append(i if j == 0 else None)
labels.update(_align_example(example, mask))
return labels
FLOAT = {float, np.float64}
def _dtype(v):
return tf.float64 if FLOAT.intersection(map(type, v[0])) else tf.int64
# based on tensorflow.data.data_collator.DataCollatorForTokenClassification
class DataCollator(object):
def __init__(self, tokenizer, categories, categories2d):
self.tokenizer = tokenizer
self.categories = categories
self.categories2d = categories2d
def _pad_labels(self, labels, sequence_length):
if self.tokenizer.padding_side == "right":
return list(labels) + [MASK_VALUE] * (sequence_length - len(labels))
else:
return [MASK_VALUE] * (sequence_length - len(labels)) + list(labels)
def __call__(self, features):
batch = self.tokenizer.pad(
features,
padding=True,
)
sequence_length = tf.convert_to_tensor(batch['input_ids']).shape[1]
for category in self.categories:
padded_labels = [self._pad_labels(lbl, sequence_length) for lbl in batch[category]]
batch[category] = padded_labels
for category in self.categories2d:
# pad the matrix rows to sequence_length
# and add empty rows to pad to obtain a sequence_length x sequence_length matrix
padded_labels = [
[
self._pad_labels(row, sequence_length) for row in lbl
] + [
self._pad_labels([], sequence_length) for _ in range(sequence_length - len(lbl))
]
for lbl in batch[category]
]
batch[category] = padded_labels
for k, v in batch.items():
try:
x = tf.convert_to_tensor(v, dtype=_dtype(v))
except:
print(k)
print(v)
print(map(type, v[0]))
print(list(map(type, v[0])))
print(_dtype(v))
raise
batch = {k: tf.convert_to_tensor(v, dtype=_dtype(v)) for k, v in batch.items()}
return batch
class LabelDict(object):
def __init__(self, feature_names, dataset_features):
unique_tags = []
for c in feature_names:
feature = dataset_features[c].feature
if type(feature) == ClassLabel:
unique_tags.append(feature.names)
else:
unique_tags.append(['VALUE'])
assert(len(unique_tags) == len(feature_names))
self.feature_names = feature_names
self.n = len(feature_names)
self.tag2id = [
{tag: idx for idx, tag in enumerate(tags)}
for tags in unique_tags
]
self.id2tag = [
{idx: tag for tag, idx in t2idx.items()}
for t2idx in self.tag2id
]
def dict_to_tensors(d):
return { k : tf.convert_to_tensor(v) for k, v in d.items() }