unmerge.py
8.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
#!/usr/bin/env python
# -*- Mode: Python; tab-width: 4; indent-tabs-mode: nil; coding: utf-8; -*-
# vim:set ft=python ts=4 sw=4 sts=4 autoindent:
from __future__ import with_statement
'''
Split merged BioNLP Shared Task annotations into separate files.
Author: Sampo Pyysalo
Version: 2011-02-24
'''
import sys
import re
try:
import argparse
except ImportError:
from os.path import basename
from sys import path as sys_path
# We are most likely on an old Python and need to use our internal version
sys_path.append(join_path(basename(__file__), '../server/lib'))
import argparse
# if True, performs extra checking to assure that the input and output
# contain the same data. This costs a bit of execution time.
DEBUG=True
class ArgumentError(Exception):
def __init__(self, s):
self.errstr = s
def __str__(self):
return 'Argument error: %s' % (self.errstr)
class SyntaxError(Exception):
def __init__(self, line, errstr=None, line_num=None):
self.line = line
self.errstr = errstr
self.line_num = str(line_num) if line_num is not None else "(undefined)"
def __str__(self):
return 'Syntax error on line %s ("%s")%s' % (self.line_num, self.line, ": "+self.errstr if self.errstr is not None else "")
class ProcessingError(Exception):
pass
class Annotation(object):
# Special value to use as the type for comment annotations.
COMMENT_TYPE = "<COMMENT>"
_typere = re.compile(r'^([a-zA-Z][a-zA-Z0-9_-]*)\b')
@staticmethod
def _parse_type(s):
'''
Attempts to parse the given line as a BioNLP ST - flavoured
standoff annotation, returning its type.
'''
if not s or s[0].isspace():
raise SyntaxError(s, "ID missing")
if s[0].isalnum() or s[0] == '*':
# Possible "standard" ID. Assume type can be found
# in second TAB-separated field.
fields = s.split("\t")
if len(fields) < 2:
raise SyntaxError(s, "No TAB in annotation")
m = Annotation._typere.search(fields[1])
if not m:
raise SyntaxError(s, "Failed to parse type in \"%s\"" % fields[1])
return m.group(1)
elif s[0] == '#':
# comment; any structure allowed. return special type
return Annotation.COMMENT_TYPE
else:
raise SyntaxError(s, "Unrecognized ID")
def __init__(self, s):
self.ann_string = s
self.type = Annotation._parse_type(s)
def __str__(self):
return self.ann_string
def argparser():
ap=argparse.ArgumentParser(description="Split merged BioNLP ST annotations into separate files.")
ap.add_argument("-a1", "--a1types", default="Protein", metavar="TYPE[,TYPE...]", help="Annotation types to place into .a1 file")
ap.add_argument("-a2", "--a2types", default="[OTHER]", metavar="TYPE[,TYPE...]", help="Annotation types to place into .a2 file")
ap.add_argument("-d", "--directory", default=None, metavar="DIR", help="Output directory")
# TODO: don't clobber existing files
#ap.add_argument("-f", "--force", default=False, action="store_true", help="Force generation even if output files exist")
ap.add_argument("-s", "--skipempty", default=False, action="store_true", help="Skip output for empty split files")
ap.add_argument("-i", "--idrewrite", default=False, action="store_true", help="Rewrite IDs following BioNLP ST conventions")
ap.add_argument("files", nargs='+', help="Files in merged BioNLP ST-flavored standoff")
return ap
def parse_annotations(annlines, fn="(unknown)"):
annotations = []
for ln, l in enumerate(annlines):
if not l.strip():
print >> sys.stderr, "Warning: ignoring empty line %d in %s" % (ln, fn)
continue
try:
annotations.append(Annotation(l))
except SyntaxError, e:
raise SyntaxError(l, e.errstr, ln)
return annotations
DEFAULT_TYPE = "<DEFAULT>"
def split_annotations(annotations, typemap):
"""
Returns the given annotations split into N collections
as specified by the given type mapping. Returns a dict
of lists keyed by the type map keys, containing the
annotations.
"""
d = {}
for a in annotations:
if a.type in typemap:
t = a.type
elif DEFAULT_TYPE in typemap:
t = DEFAULT_TYPE
else:
raise ArgumentError("Don't know where to place annotation of type '%s'" % a.type)
s = typemap[t]
if s not in d:
d[s] = []
d[s].append(a)
return d
def type_mapping(arg):
"""
Generates a mapping from types to filename suffixes
based on the given arguments.
"""
m = {}
# just support .a1 and .a2 now
for suff, typestr in (("a1", arg.a1types),
("a2", arg.a2types)):
for ts in typestr.split(","):
# default arg
t = ts if ts != "[OTHER]" else DEFAULT_TYPE
if t in m:
raise ArgumentError("Split for '%s' ambiguous (%s or %s); check arguments." % (ts, m[t], suff))
m[t] = suff
return m
def output_file_name(fn, directory, suff):
import os.path
dir, base = os.path.split(fn)
root, ext = os.path.splitext(base)
if not directory:
# default to directory of file
directory = dir
return os.path.join(directory, root+"."+suff)
def annotation_lines(annotations):
return [str(a) for a in annotations]
def write_annotation_lines(fn, lines):
with open(fn, 'wt') as f:
for l in lines:
f.write(l)
def read_annotation_lines(fn):
with open(fn) as f:
return f.readlines()
def verify_split(origlines, splitlines):
orig = origlines[:]
split = []
for k in splitlines:
split.extend(splitlines[k])
orig.sort()
split.sort()
orig_only = []
split_only = []
oi, si = 0, 0
while oi < len(orig) and si < len(split):
if orig[oi] == split[si]:
oi += 1
si += 1
elif orig[oi] < split[si]:
orig_only.append(orig[oi])
oi += 1
else:
assert split[si] < orig[si]
split_only.append(split[si])
si += 1
while oi < len(orig):
orig_only.append(orig[oi])
oi += 1
while si < len(split):
split_only.append(split[si])
si += 1
difference_found = False
for l in split_only:
print >> sys.stderr, "Split error: split contains extra line '%s'" % l
difference_found = True
for l in orig_only:
# allow blank lines to be removed
if l.strip() == "":
continue
print >> sys.stderr, "Split error: split is missing line '%s'" % l
difference_found = True
if difference_found:
raise ProcessingError
def process_file(fn, typemap, directory, mandatory):
annlines = read_annotation_lines(fn)
annotations = parse_annotations(annlines)
splitann = split_annotations(annotations, typemap)
# always write these, even if they will be empty
for t in mandatory:
splitann[t] = splitann.get(t, [])
splitlines = {}
for suff in splitann:
splitlines[suff] = annotation_lines(splitann[suff])
if DEBUG:
verify_split(annlines, splitlines)
for suff in splitann:
ofn = output_file_name(fn, directory, suff)
write_annotation_lines(ofn, splitlines[suff])
def main(argv=None):
if argv is None:
argv = sys.argv
arg = argparser().parse_args(argv[1:])
try:
typemap = type_mapping(arg)
except ArgumentError, e:
print >> sys.stderr, e
return 2
if arg.skipempty:
mandatory_outputs = []
else:
mandatory_outputs = ["a1", "a2"]
for fn in arg.files:
try:
process_file(fn, typemap, arg.directory, mandatory_outputs)
except IOError, e:
print >> sys.stderr, "Error: failed %s, skip processing (%s)" % (fn, e)
except SyntaxError, e:
print >> sys.stderr, "Error: failed %s, skip processing (%s)" % (fn, e)
except:
print >> sys.stderr, "Fatal: unexpected error processing %s" % fn
raise
return 0
if __name__ == "__main__":
sys.exit(main())