BIOtoStandoff.py
7.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
#!/usr/bin/env python
# Script to convert a column-based BIO-formatted entity-tagged file
# into standoff with reference to the original text.
from __future__ import with_statement
import sys
import re
import os
import codecs
class taggedEntity:
def __init__(self, startOff, endOff, eType, idNum, fullText):
self.startOff = startOff
self.endOff = endOff
self.eType = eType
self.idNum = idNum
self.fullText = fullText
self.eText = fullText[startOff:endOff]
def __str__(self):
return "T%d\t%s %d %d\t%s" % (self.idNum, self.eType, self.startOff,
self.endOff, self.eText)
def check(self):
# sanity checks: the string should not contain newlines and
# should be minimal wrt surrounding whitespace
assert "\n" not in self.eText, \
"ERROR: newline in entity: '%s'" % self.eText
assert self.eText == self.eText.strip(), \
"ERROR: entity contains extra whitespace: '%s'" % self.eText
def BIO_to_standoff(BIOtext, reftext, tokenidx=2, tagidx=-1):
BIOlines = BIOtext.split('\n')
return BIO_lines_to_standoff(BIOlines, reftext, tokenidx, tagidx)
next_free_id_idx = 1
def BIO_lines_to_standoff(BIOlines, reftext, tokenidx=2, tagidx=-1):
global next_free_id_idx
taggedTokens = []
ri, bi = 0, 0
while(ri < len(reftext)):
if bi >= len(BIOlines):
print >> sys.stderr, "Warning: received BIO didn't cover given text"
break
BIOline = BIOlines[bi]
if re.match(r'^\s*$', BIOline):
# the BIO has an empty line (sentence split); skip
bi += 1
else:
# assume tagged token in BIO. Parse and verify
fields = BIOline.split('\t')
try:
tokentext = fields[tokenidx]
except:
print >> sys.stderr, "Error: failed to get token text " \
"(field %d) on line: %s" % (tokenidx, BIOline)
raise
try:
tag = fields[tagidx]
except:
print >> sys.stderr, "Error: failed to get token text " \
"(field %d) on line: %s" % (tagidx, BIOline)
raise
m = re.match(r'^([BIO])((?:-[A-Za-z0-9_-]+)?)$', tag)
assert m, "ERROR: failed to parse tag '%s'" % tag
ttag, ttype = m.groups()
# strip off starting "-" from tagged type
if len(ttype) > 0 and ttype[0] == "-":
ttype = ttype[1:]
# sanity check
assert ((ttype == "" and ttag == "O") or
(ttype != "" and ttag in ("B","I"))), \
"Error: tag/type mismatch %s" % tag
# go to the next token on reference; skip whitespace
while ri < len(reftext) and reftext[ri].isspace():
ri += 1
# verify that the text matches the original
assert reftext[ri:ri+len(tokentext)] == tokentext, \
"ERROR: text mismatch: reference '%s' tagged '%s'" % \
(reftext[ri:ri+len(tokentext)].encode("UTF-8"),
tokentext.encode("UTF-8"))
# store tagged token as (begin, end, tag, tagtype) tuple.
taggedTokens.append((ri, ri+len(tokentext), ttag, ttype))
# skip the processed token
ri += len(tokentext)
bi += 1
# ... and skip whitespace on reference
while ri < len(reftext) and reftext[ri].isspace():
ri += 1
# if the remaining part either the reference or the tagged
# contains nonspace characters, something's wrong
if (len([c for c in reftext[ri:] if not c.isspace()]) != 0 or
len([c for c in BIOlines[bi:] if not re.match(r'^\s*$', c)]) != 0):
assert False, "ERROR: failed alignment: '%s' remains in reference, " \
"'%s' in tagged" % (reftext[ri:], BIOlines[bi:])
standoff_entities = []
# cleanup for tagger errors where an entity begins with a
# "I" tag instead of a "B" tag
revisedTagged = []
prevTag = None
for startoff, endoff, ttag, ttype in taggedTokens:
if prevTag == "O" and ttag == "I":
print >> sys.stderr, "Note: rewriting \"I\" -> \"B\" after \"O\""
ttag = "B"
revisedTagged.append((startoff, endoff, ttag, ttype))
prevTag = ttag
taggedTokens = revisedTagged
# cleanup for tagger errors where an entity switches type
# without a "B" tag at the boundary
revisedTagged = []
prevTag, prevType = None, None
for startoff, endoff, ttag, ttype in taggedTokens:
if prevTag in ("B", "I") and ttag == "I" and prevType != ttype:
print >> sys.stderr, "Note: rewriting \"I\" -> \"B\" at type switch"
ttag = "B"
revisedTagged.append((startoff, endoff, ttag, ttype))
prevTag, prevType = ttag, ttype
taggedTokens = revisedTagged
prevTag, prevEnd = "O", 0
currType, currStart = None, None
for startoff, endoff, ttag, ttype in taggedTokens:
if prevTag != "O" and ttag != "I":
# previous entity does not continue into this tag; output
assert currType is not None and currStart is not None, \
"ERROR in %s" % fn
standoff_entities.append(taggedEntity(currStart, prevEnd, currType,
next_free_id_idx, reftext))
next_free_id_idx += 1
# reset current entity
currType, currStart = None, None
elif prevTag != "O":
# previous entity continues ; just check sanity
assert ttag == "I", "ERROR in %s" % fn
assert currType == ttype, "ERROR: entity of type '%s' continues " \
"as type '%s'" % (currType, ttype)
if ttag == "B":
# new entity starts
currType, currStart = ttype, startoff
prevTag, prevEnd = ttag, endoff
# if there's an open entity after all tokens have been processed,
# we need to output it separately
if prevTag != "O":
standoff_entities.append(taggedEntity(currStart, prevEnd, currType,
next_free_id_idx, reftext))
next_free_id_idx += 1
for e in standoff_entities:
e.check()
return standoff_entities
RANGE_RE = re.compile(r'^(-?\d+)-(-?\d+)$')
def parse_indices(idxstr):
# parse strings of forms like "4,5" and "6,8-11", return list of
# indices.
indices = []
for i in idxstr.split(','):
if not RANGE_RE.match(i):
indices.append(int(i))
else:
start, end = RANGE_RE.match(i).groups()
for j in range(int(start), int(end)):
indices.append(j)
return indices
def main(argv):
if len(argv) < 3 or len(argv) > 5:
print >> sys.stderr, "Usage:", argv[0], "TEXTFILE BIOFILE [TOKENIDX [BIOIDX]]"
return 1
textfn, biofn = argv[1], argv[2]
tokenIdx = None
if len(argv) >= 4:
tokenIdx = int(argv[3])
bioIdx = None
if len(argv) >= 5:
bioIdx = argv[4]
with open(textfn, 'rU') as textf:
text = textf.read()
with open(biofn, 'rU') as biof:
bio = biof.read()
if tokenIdx is None:
so = BIO_to_standoff(bio, text)
elif bioIdx is None:
so = BIO_to_standoff(bio, text, tokenIdx)
else:
try:
indices = parse_indices(bioIdx)
except:
print >> sys.stderr, 'Error: failed to parse indices "%s"' % bioIdx
return 1
so = []
for i in indices:
so.extend(BIO_to_standoff(bio, text, tokenIdx, i))
for s in so:
print s
return 0
if __name__ == "__main__":
sys.exit(main(sys.argv))