# -*- coding: iso-8859-1 -*- import sys, os import glob from tqdm import tqdm import pickle import random import nltk from HanTa import HanoverTagger as ht from enum import Enum from database_preparation.utils_stringpreparation import read_german_text from database_preparation.utils_wordbase import RegexpReplacer, RegexpSynonyms from database_preparation.stop_word_list import filter_stopwords import json import argparse import pandas as pd ''' # installed: nltk, Hanta, tqdm, numpy todo: add custom preprocessing for short diagnose texts: - replace: [('\n', ' '), ('DMGS', 'DM GS'), ('FGFSGS', 'FG FSGS'), ('-', ' ')] - remove: ['(schöner Fall)', 'mit', 'bei', 'nach', 'wohl', 'und'] - ''' ########## define enums ########## class LemmatizeMode(Enum): lemma_only_nouns = 1 lemma_only_nouns_adja = 2 lemma = 3 none = 4 class PunctuationMode(Enum): keep = 1 remove = 2 replace = 3 class NumberMode(Enum): keep = 1 remove = 2 replace = 3 ########## define some symbols and lists ########## num_replace_symbol = "*" punct_replace_symbol = "--" punctuations_to_remove = ['%', '=', '+', '-', '?', '<', '>', '\'', '``', '\'\'', ',', ';', '.', '*', '#', '´´', '\\', '/', '(', ')', '[', ']', '{', '}', '~', ':'] do_not_lemma_list = ['igg', 'iga', 'igm'] ########## Functions ########## def prepro_params_2_string(params): metadata_text = "" for i, key in enumerate(params.keys()): metadata_text = metadata_text + key + ': ' if key == 'lemma_mode': metadata_text = metadata_text + str(LemmatizeMode(params[key])).replace('LemmatizeMode.', '') elif key == 'punct_mode': metadata_text = metadata_text + str(PunctuationMode(params[key])).replace('PunctuationMode.', '') elif key == 'number_mode': metadata_text = metadata_text + str(NumberMode(params[key])).replace('NumberMode.', '') else: metadata_text = metadata_text + str(params[key]) if i < len(params.keys()) - 1: metadata_text = metadata_text + '\n' return metadata_text def get_metadata(path_to_pickled_prepro_text_list): try: with open(path_to_pickled_prepro_text_list.replace('.pkl','_meta.json')) as json_file: params = json.load(json_file) return params except: return None def print_meta_data(path_to_pickled_prepro_text_list): try: params = get_metadata(path_to_pickled_prepro_text_list) print(prepro_params_2_string(params)) print() return True except: return False def is_histo_num(word): if word[:2].lower() == "h/" and word[2].isdigit(): return True return False def is_date(word): if '.20' in word and word[0].isdigit() and word[-1].isdigit(): return True return False def get_corpus_stats(path2corpus): corpus_stats = {'total_token_count': 0, 'amount_docs': 0, 'tokens_per_doc': 0, } file_list = glob.glob(path2corpus + '/*.txt') for idx, t_file in tqdm(enumerate(file_list)): t_text = read_german_text(t_file) t_text = nltk.tokenize.word_tokenize(t_text, language='german') corpus_stats['total_token_count'] += len(t_text) corpus_stats['tokens_per_doc'] = float(corpus_stats['total_token_count']) / float(len(file_list)) corpus_stats['amount_docs'] = len(file_list) return corpus_stats def preprocess(parameter_dict): """ prepocesses a corpus, which is at source_data_path=.../path_to_corpus_folder. This folder (corpus) should contain the .txt files which should be processed. The .txt files should be named with name in the form #.txt returns preprocessed_corpus as list of shape: [first_preprocessed_text, second_preprocessed_text, ...] Histo numbers and dates will always be removed! """ source_data_path = parameter_dict['source_data']['path_to_dataset'] sections_to_preprocess = parameter_dict['source_data']['sections'] do_tokenize = parameter_dict['tokenized'] cased = parameter_dict['cased'] stopword_filtered = parameter_dict['stopword_filtered'] use_combiner = parameter_dict['use_combiner'] use_replacer = parameter_dict['use_replacer'] lemma_mode = parameter_dict['lemma_mode'] punct_mode = parameter_dict['punct_mode'] number_mode = parameter_dict['number_mode'] lemma_mode = LemmatizeMode(lemma_mode) punct_mode = PunctuationMode(punct_mode) number_mode = NumberMode(number_mode) combiner = RegexpSynonyms() replacer = RegexpReplacer() tagger = ht.HanoverTagger('morphmodel_ger.pgz') # load the files in a sorted way: file_list = glob.glob(source_data_path + '/*.json') file_list.sort() preprocessed_corpus = {} corpus = {} random_example_idx = random.randrange(min(10, len(file_list))) for idx, t_file in tqdm(enumerate(file_list)): # load the txt-file # t_text = read_german_text(t_file) case_id = t_file.split('/')[-1].replace('.json', '') # load the json-file with open(t_file) as json_file: report = json.load(json_file) t_text = "" for section in sections_to_preprocess: if section in report: if report[section]: t_text += '\n' + report[section] if not t_text: # print(f"Skipping file {t_file} because it does not contain any of the sections: {sections_to_preprocess}") continue original_text = t_text # replace the words if use_replacer: t_text = replacer.replace(t_text) # tokenize t_text = nltk.tokenize.word_tokenize(t_text, language='german') # filter stopwords if stopword_filtered: t_text = filter_stopwords(t_text) # combine word pairs if use_combiner: t_text = combiner.combine(t_text) # lemmatize / stemming t_text = tagger.tag_sent(t_text) # lemmarize the text if lemma_mode == LemmatizeMode.lemma_only_nouns: t_text = [lemma for (word, lemma, pos) in t_text if pos == "NN" or pos == "NE"] elif lemma_mode == LemmatizeMode.lemma_only_nouns_adja: t_text = [lemma for (word, lemma, pos) in t_text if pos == "NN" or pos == "NE" or pos == "ADJA"] elif lemma_mode == LemmatizeMode.lemma: lemmatized_text = [] for (word, lemma, pos) in t_text: if lemma == '--' or word.lower() in do_not_lemma_list: lemmatized_text.append(word) else: lemmatized_text.append(lemma) t_text = lemmatized_text del lemmatized_text else: # none t_text = [word for (word, lemma, pos) in t_text] # filter punctuation: if punct_mode == PunctuationMode.remove: t_text = [token for token in t_text if token not in punctuations_to_remove] elif punct_mode == PunctuationMode.replace: t_text = [token if token not in punctuations_to_remove else punct_replace_symbol for token in t_text] # number filtering filtered_text = [] use_single_symbol = True for i, word in enumerate(t_text): # always remove dates and histonums: if is_histo_num(word) or is_date(word): continue if number_mode != NumberMode.keep: if word.isdigit(): # remove dumbers if number_mode == NumberMode.replace: filtered_text.append('*' if use_single_symbol else '_zahl_') continue elif number_mode == NumberMode.remove: continue elif ',' in word: # remove "0,3" w = word.split(',') if len(w) == 2: if w[0].isdigit() and w[1].isdigit(): if number_mode == NumberMode.replace: filtered_text.append('*' if use_single_symbol else 'x,y') continue elif number_mode == NumberMode.remove: continue elif word[0].isdigit() and word[-1] == 'nm': # remove distances like 500nm if number_mode == NumberMode.replace: filtered_text.append('*' if use_single_symbol else 'x_nm') continue elif number_mode == NumberMode.remove: continue elif '/' in word: # remove stuff like 6/10 w = word.split('/') if len(w) == 2: if w[0].isdigit() and w[1].isdigit(): if number_mode == NumberMode.replace: filtered_text.append('*' if use_single_symbol else 'x/y') continue elif number_mode == NumberMode.remove: continue elif '-' in word: # remove stuff like 5-10 w = word.split('-') if len(w) == 2: if w[0].isdigit() and w[1].isdigit(): if number_mode == NumberMode.replace: filtered_text.append('*' if use_single_symbol else 'x-y') continue elif number_mode == NumberMode.remove: continue elif word[0].isdigit() and word[-1].lower() == 'x': # remove 6x ('six times ...') if number_mode == NumberMode.replace: filtered_text.append('*' if use_single_symbol else 'x_mal') continue elif number_mode == NumberMode.remove: continue elif word[0].isdigit() and word[-1].lower() == '%': # remove 5_% if number_mode == NumberMode.replace: filtered_text.append('*' if use_single_symbol else 'x_%') continue elif number_mode == NumberMode.remove: continue elif '+' in word: # remmove sum expressions like "3+3+3=9/20" if number_mode == NumberMode.replace: filtered_text.append('*' if use_single_symbol else '_summe_') continue elif number_mode == NumberMode.remove: continue elif word == 'cm' or word == 'mm': # remove also distance words: if number_mode == NumberMode.replace: filtered_text.append('*' if use_single_symbol else '_distanz_') continue elif number_mode == NumberMode.remove: continue filtered_text.append(word) t_text = filtered_text del filtered_text # always lower the text at the end, otherwise # cased sensitive operations might not work anymore! if not cased: t_text = [word.lower() for word in t_text] # resample if we want it to be tokenized: if not do_tokenize: t_text = ' '.join(t_text) # add to the list preprocessed_corpus[case_id] = t_text corpus[case_id] = original_text if idx == random_example_idx: print("-------------- Preprocessing Example: ---------------") print("Original text of " + t_file + ":") print(original_text) print("Processed text:") print(t_text) print("-----------------------------\n") return preprocessed_corpus, corpus def main(): # parse arguments: sys.path.append(os.getcwd()) parser = argparse.ArgumentParser() parser.add_argument("--path_to_preprocessing_params", default='data/bow_diag_clustering/bow_diag_config.json') parser.add_argument("--target_path", default=None) args = parser.parse_args() with open(args.path_to_preprocessing_params) as json_file: params = json.load(json_file) print(f"------ Preprocessing parameters: ------") print(prepro_params_2_string(params)) print() preprocessed_corpus_dict, corpus_dict = preprocess(params) #with open(args.target_path, 'wb') as f: # pickle.dump(preprocessed_corpus_dict, f) if args.target_path is None: args.target_path = args.path_to_preprocessing_params.replace('_config', '').replace('config', '') print(f"saved preprocessed corpus at {args.target_path}, containing {len(preprocessed_corpus_dict)} reports.") print(f"Voctabulary size: {len(set([word for c_id in preprocessed_corpus_dict.keys() for word in preprocessed_corpus_dict[c_id]]))}") corpus_as_table = { 'case_id': [c_id for c_id in preprocessed_corpus_dict.keys()], 'text': [corpus_dict[c_id] for c_id in corpus_dict.keys()], 'preprocessed_text': [preprocessed_corpus_dict[c_id] for c_id in preprocessed_corpus_dict.keys()] } # store results: df = pd.DataFrame(corpus_as_table) df.to_pickle(args.target_path.replace('.json', '.df.pkl')) df.to_csv(args.target_path.replace('.json', '.df.csv')) #with open(args.target_path, "w") as json_file: # json.dump(preprocessed_corpus_dict, json_file, indent=4) '''print(get_corpus_stats("../DataNephroTexts/description")) print(get_corpus_stats("../DataNephroTexts/diagnosis")) print(get_corpus_stats("../DataNephroTexts/end"))''' if __name__ == '__main__': main()