database.py 14.1 KB
# -*- coding: utf-8 -*-
import time
from datetime import date, datetime
from collections import defaultdict

from xml.etree.ElementTree import Element, ElementTree, tostring
import xml.etree.ElementTree as ET

class Database:

    def __init__(self, db, anno_per_file, from_file=True):
        if from_file:
            self.db_name = db
            self.tree = ET.parse(db)
            self.root = self.tree.getroot()
        else:
            self.db_name = None
            self.tree = None
            self.root = ET.fromstring(db)
            
        self.anno_per_file = anno_per_file
        self.make_index()
        
    def owned(self, ann_elem):
        return not self.finished(ann_elem) and not self.returned(ann_elem)

    def finished(self, ann_elem):
        return ann_elem.find("checkinDate") is not None 
    
    def returned(self, ann_elem):
        return ann_elem.find("return") is not None
    
    def get_reason(self, ann_elem):
        reas = ann_elem.find("return").find("reason").text
        if reas is None:
            reas = ""
        return reas
        
    def rejected(self, file_elem):
        return file_elem.find("rejected") is not None
    
    def fixed(self, ann_elem):
        ret = ann_elem.find("return")
        if ret is not None and ret.find("fixed") is not None:
            return True
        return False
       
    def ann_elem_for_user(self, file_elem, user):
        idx = 0
        for ann_elem in file_elem.findall("ann"):
            if ann_elem.find("annName").text == user:                                
                return ann_elem, idx
            idx = idx + 1
        return None, None
    
    def sann_elem_for_user(self, file_elem, user):
        for ann_elem in file_elem.findall("s_ann"):
            if ann_elem.find("annName").text == user:                                
                return ann_elem        
        return None
    
    def fixed_for_user(self, file_elem, user, is_adj):
        if is_adj:
            ann = self.sann_elem_for_user(file_elem, user)
        else:
            ann, idx = self.ann_elem_for_user(file_elem, user)
        if ann is not None:
            return self.fixed(ann)        
        return False        
    
    def make_index(self):
        self.file_index = {}
        self.ann_owned_index = defaultdict(list)
        self.adj_owned_index = defaultdict(list)

        for file_elem in self.root:
            name_elem = file_elem.find("name")
            if name_elem == None:
                raise Exception("No name assigned to a file element !")
            self.file_index[name_elem.text] = file_elem
            
            if self.rejected(file_elem):
                continue

            for ann_elem in file_elem.findall("ann"):
                if self.owned(ann_elem): 
                    ann_name = ann_elem.find("annName").text
                    self.ann_owned_index[ann_name].append(file_elem)

            for ann_elem in file_elem.findall("s_ann"):
                if self.owned(ann_elem):
                    ann_name = ann_elem.find("annName").text
                    self.adj_owned_index[ann_name].append(file_elem)

    def fix(self, ann_elem):
        if not self.returned(ann_elem):
            raise Exception("File not returned in database!")
        if self.fixed(ann_elem):
            raise Exception("File already fixed in database!")
        
        ret = ann_elem.find("return")
        fixed = ET.SubElement(ret, "fixed")
    
    def add(self, file_name):
        if self.file_index.has_key(file_name):
            raise Exception("File name already in database !")
        file_elem = ET.SubElement(self.root, "file")
        name_elem = ET.SubElement(file_elem, "name")
        name_elem.text = file_name
        date_elem = ET.SubElement(file_elem, "addDate")
        date_elem.text = time.strftime("%c")
        self.file_index[file_name] = file_elem
        
    def remove(self, file_name):
        if not self.file_index.has_key(file_name):
            raise Exception("File name not in database !")
        file_elem = self.file_index[file_name]
        self.root.remove(file_elem)
        self.make_index()
        
    def reject(self, file_name, reason):
        if not self.file_index.has_key(file_name):
            raise Exception("File name not in database !")
        file_elem = self.file_index[file_name]
        if self.rejected(file_elem):
            raise Exception("File already rejected !")
        rej_elem = ET.SubElement(file_elem, "rejected")
        rej_elem.text = reason        
        self.make_index()

    # For prettyprint, used in 'save' method.
    def indent(self, elem, level=0):
        i = "\n" + level * "  "
        if len(elem):
            if not elem.text or not elem.text.strip():
                elem.text = i + "  "
            if not elem.tail or not elem.tail.strip():
                elem.tail = i
            for elem in elem:
                self.indent(elem, level + 1)
            if not elem.tail or not elem.tail.strip():
                elem.tail = i
        else:
            if level and (not elem.tail or not elem.tail.strip()):
                elem.tail = i

    def save(self):
    	if not self.db_name:
    	    raise Exception("XML not read from file !")
    
    	self.indent(self.root)
    	self.tree.write(self.db_name, encoding="utf-8")

    def upload_prevention(self, file_name, user, is_super):
        if not self.file_index.has_key(file_name):
            return "Filename " + file_name.encode('utf-8') + " not known by server."
        file_elem = self.file_index[file_name]
        if self.rejected(file_elem):
            return "File rejected: " + file_name.encode('utf-8')
        
        if is_super:                
            ann_elem = file_elem.find("s_ann")
            if ann_elem is None:
                return "No superannotator assigned to file " + file_name.encode('utf-8')
            
            if user != ann_elem.find("annName").text:
                return "Different superannotator assigned to file " + file_name.encode('utf-8')
            
            if ann_elem.find("checkinDate") is not None:
                return "File " + file_name.encode('utf-8') + " already checked in!"
            
            return None
                        
        else:            
            annotations = file_elem.findall("ann")
            owners = map(lambda ann: ann.find("annName").text, annotations)
            if not user in owners:
                return "User " + user + " doesn't own the file " + file_name.encode('utf-8')
            
            i = owners.index(user)
            ann_elem = annotations[i]
            if ann_elem.find("checkinDate") is not None:
                return "User " + user + " already checked in the file " + file_name.encode('utf-8')
            
            return None
            
    def upload_dest(self, file_name, user):
        """Upload destination (annName element and ID -- 1 or 2) directory."""
        file_elem = self.file_index[file_name]
        annotations = file_elem.findall("ann")

        owners = map(lambda ann: ann.find("annName").text, annotations)
        exc = Exception("Cannot upload " + file_name.encode('utf-8')
                       + " file by " + user + " annotator !")
        if not user in owners:
            raise exc

        i = owners.index(user)
        ann_elem = annotations[i]
        if ann_elem.find("checkinDate") is not None:
            raise exc

        return (ann_elem, i)

    def upload_id(self, file_name, user):
        return self.upload_dest(file_name, user)[1]

    def return_file(self, file_name, user, reason):        
        (ann_elem, i) = self.upload_dest(file_name, user)
        ret_elem = ET.SubElement(ann_elem, "return")        
        date_elem = ET.SubElement(ret_elem, "date")        
        date_elem.text = time.strftime("%c")
        date_elem = ET.SubElement(ret_elem, "reason")
        if reason is None:
            reason = ""
        date_elem.text = reason
        return i
    
    def return_file_prim(self, file_name, user, reason):        
        file_elem = self.file_index[file_name]
        ann_elem = file_elem.find("s_ann")

        if (user != ann_elem.find("annName").text or
             ann_elem.find("checkinDate") is not None):
            raise Exception("Cannot upload " + file_name.encode('utf-8')
                    + " file by " + user + " annotator !")

        ret_elem = ET.SubElement(ann_elem, "return")        
        date_elem = ET.SubElement(ret_elem, "date")        
        date_elem.text = time.strftime("%c")
        date_elem = ET.SubElement(ret_elem, "reason")
        if reason is None:
            reason = ""
        date_elem.text = reason
        
    def upload(self, file_name, user):
        (ann_elem, i) = self.upload_dest(file_name, user)
        date_elem = ET.SubElement(ann_elem, "checkinDate")
        date_elem.text = time.strftime("%c")
        return i

    def upload_prim(self, file_name, user):
        file_elem = self.file_index[file_name]
        ann_elem = file_elem.find("s_ann")

        if (user != ann_elem.find("annName").text or self.finished(ann_elem)):
            raise Exception("Cannot upload " + file_name.encode('utf-8')
                    + " file by " + user + " annotator !")

        date_elem = ET.SubElement(ann_elem, "checkinDate")
        date_elem.text = time.strftime("%c")

    def download(self, file_name, user):
        file_elem = self.file_index[file_name]

        annotations = file_elem.findall("ann")
        owners = map(lambda ann: ann.find("annName").text, annotations)
        
        if self.fixed_for_user(file_elem, user, False):
            ann, idx = self.ann_elem_for_user(file_elem, user)
            date = ann.find("return").find("date").text
            ann.remove(ann.find("return"))
            
            date_elem = ET.SubElement(ann, "returnDate")
            date_elem.text = date
                     
            date_elem = ET.SubElement(ann, "checkoutDate")
            date_elem.text = time.strftime("%c")
            
            return idx
        
        else:        
            if user in owners or len(owners) + 1 > self.anno_per_file:
                raise Exception("Cannot set " + user + " annotator to '"
                                    + file_name.encode('utf-8') + "' file !")
        
            ann_elem = ET.SubElement(file_elem, "ann")
            ann_name_elem = ET.SubElement(ann_elem, "annName")
            ann_name_elem.text = user
            
            date_elem = ET.SubElement(ann_elem, "checkoutDate")
            date_elem.text = time.strftime("%c")
            
            return None

    def download_prim(self, file_name, user):
        file_elem = self.file_index[file_name]

        if self.fixed_for_user(file_elem, user, True):
            sann = self.sann_elem_for_user(file_elem, user)
            date = sann.find("return").find("date").text
            sann.remove(sann.find("return"))
            
            date_elem = ET.SubElement(sann, "returnDate")
            date_elem.text = date
                       
            date_elem = ET.SubElement(sann, "checkoutDate")
            date_elem.text = time.strftime("%c")    
            
            return True
                   
        else:  
            if len(file_elem.findall("s_ann")) > 0:
                raise Exception("Cannot set " + user + " adjudicator to '"
                        + file_name.encode('utf-8') + "' file !")
    
            ann_elem = ET.SubElement(file_elem, "s_ann")
            ann_name_elem = ET.SubElement(ann_elem, "annName")
            ann_name_elem.text = user
            
            date_elem = ET.SubElement(ann_elem, "checkoutDate")
            date_elem.text = time.strftime("%c")
            
            return False

    def for_annotation(self, user):
        priority = []
        result = []    
        for file_elem in self.root:
            if self.rejected(file_elem) or file_elem.find("s_ann") is not None:
                continue
            
            anns = file_elem.findall("ann")
            owners = map(lambda ann: ann.find("annName").text, anns)
            if len(owners) < self.anno_per_file and user not in owners:
                result.append(file_elem.find("name").text)
            
            if self.fixed_for_user(file_elem, user, False):
                priority.append(file_elem.find("name").text)                
                
        return priority, result

    def for_adjudication(self, user):
        priority = []        
        result = []
        for file_elem in self.root:
            if self.rejected(file_elem):
                continue
            
            anns = file_elem.findall("ann")
            owners = map(lambda ann: ann.find("annName").text, anns)
            checked = map(lambda ann: ann.find("checkinDate") != None, anns)
            if (len(owners) == self.anno_per_file
                and file_elem.find("s_ann") is None
                and user not in owners
                and all(checked)):
                result.append(file_elem.find("name").text)
                
            if self.fixed_for_user(file_elem, user, True):                    
                priority.append(file_elem.find("name").text)                
                
        return priority, result

    def owns_normal(self, user): 
        return [ file_elem.find("name").text
                 for file_elem
                 in self.ann_owned_index[user] ]

    def owns_super(self, user):
        return [ file_elem.find("name").text
                 for file_elem
                 in self.adj_owned_index[user] ]

    def owns(self, user):
        return self.owns_normal(user) + self.owns_super(user)
    
    def finished_count(self, user):
        n = 0
        for file_elem in self.root:
                 
            if self.rejected(file_elem):
                continue
            
            items = file_elem.findall("ann")
            items.extend(file_elem.findall("s_ann"))
            for ann_elem in items:
                ann_name = ann_elem.find("annName").text
                if ann_name == user and self.finished(ann_elem) and not self.returned(ann_elem):                
                    n += 1
                                        
        return n