Commit d5224af3 authored by Maximilian Legnar's avatar Maximilian Legnar

added first version

parent a6bfb3de
def cluster_entropy(df):
#%% import
from CorpusHomogeneity.text_entropy import corpus_entropy
import numpy as np
#%%
cluster_ids = np.unique(df.cluster)
ent_mean, ent_std = [None] * len(cluster_ids), [None] * len(cluster_ids)
for idx, i_cluster in enumerate(cluster_ids):
if i_cluster == -1: # important to ignore non-clusters texts
ent_mean[idx] = np.nan
ent_std[idx] = np.nan
else:
t_corpus = df[df['cluster'] == i_cluster]
t_corpus = t_corpus.text.tolist()
ent_mean[idx], ent_std[idx] = corpus_entropy(t_corpus)
#%% output
ent_mean = np.nanmean(ent_mean)
ent_std = np.nanstd(ent_mean)
return ent_mean, ent_std
\ No newline at end of file
#%% tokenize text
def tokenize_corpus(corpus):
#%% imports
import nltk
import string
from nltk.tokenize import word_tokenize
from HanTa import HanoverTagger as ht
stop_words = nltk.corpus.stopwords.words('german')
tagger = ht.HanoverTagger('morphmodel_ger.pgz')
from tqdm import tqdm
# %% read the files to a list
corpus_tokenized = corpus
for idx, t_text in tqdm(enumerate(corpus_tokenized)):
#%% get the words from the text
t_text = str(t_text)
tokens = word_tokenize(t_text, language='german')
tokens = list(filter(lambda token: token not in string.punctuation, tokens))
#%% get only the nouns
nouns = tagger.tag_sent(tokens)
nouns = [lemma for (word, lemma, pos) in nouns if pos == "NN" or pos == "NE"]
#%% mount it back
corpus_tokenized[idx] = nouns
#%% output-layer
return corpus_tokenized
#%% calculate the entropy
def corpus_entropy(corpus):
'''
how much differ the docs, compared to the whole corpus?
'''
#%% input layer
#corpus_tokenized = tokenize_corpus(corpus)
import numpy as np
text1 = np.asarray(corpus[0])
is_tokenized = bool(text1.ndim)
if is_tokenized:
corpus_not_tokenized = [" ".join(i_text) for i_text in corpus]
else:
corpus_not_tokenized = corpus
# corpus_not_tokenized = [nltk.tokenize.word_tokenize(i_text, language='german') for i_text in corpus]
#%% count the word-occurences
from sklearn.feature_extraction.text import CountVectorizer
import numpy as np
vectorizer = CountVectorizer()
try:
X = vectorizer.fit_transform(corpus_not_tokenized)
except:
return np.nan, np.nan
df = X.toarray()
#%% calculate the entropy
from scipy.stats import entropy
import numpy as np
corspus_tf = sum(df)
corpus_mean = np.mean(df,0)
ent_values = []
for i in range(0, df.shape[0]):
document_tf = df[i, :]
a = entropy(document_tf, qk=corspus_tf) #
ent_values.append(a)
#%% output-layer
entropy_mean = np.nanmean(ent_values)
entropy_std = np.nanstd(ent_values)
#%%
return entropy_mean, entropy_std
if __name__ == '__main__':
corpus_entropy()
\ No newline at end of file
# -*- coding: iso-8859-1 -*-
import random
import os, sys
from os import listdir
from os.path import isfile, join
import pyarrow as pa
import pandas as pd
import datasets
from database_preparation.utils_stringpreparation import read_german_text
import argparse
# parse arguments:
sys.path.append(os.getcwd())
parser = argparse.ArgumentParser()
parser.add_argument("--path_to_reports",
default='../DataNephroTexts/reports')
parser.add_argument("--output_path",
default='./LanguageModelling/hf_nephro_set_1')
parser.add_argument("--percentage_train_amount", type=float, default=0.9)
args = parser.parse_args()
def save_as_hf_dataset(datapath, file_id_list, output_path):
report_texts = []
for id in file_id_list:
text = (read_german_text(datapath + str("/") + str(id)))
report_texts.append(text)
df = pd.DataFrame({
'text': report_texts
})
mytable = pa.Table.from_pandas(df)
my_dataset = datasets.Dataset(mytable)
#my_dataset.save_to_disk(output_path)
my_dataset.to_json(output_path + ".json")
print(f"Generated {output_path}")
def main():
print("processing " + args.path_to_reports)
reports = [f for f in listdir(args.path_to_reports) if isfile(join(args.path_to_reports, f))]
reps0 = [r for r in reports if r[-5] == '0']
random.shuffle(reps0)
last_index = len(reps0) - 1
until = int(args.percentage_train_amount * last_index)
if args.percentage_train_amount < 1:
train = [e for i, e in enumerate(reps0) if i <= until]
val = [e for i, e in enumerate(reps0) if i > until]
save_as_hf_dataset(args.path_to_reports, train, args.output_path + "_train")
save_as_hf_dataset(args.path_to_reports, val, args.output_path + "_validation")
else:
save_as_hf_dataset(args.path_to_reports, reps0, args.output_path)
# how to load dataset:
'''ds = datasets.load_from_disk("./LanguageModelling/path2set")
print(ds)'''
return 0
if __name__ == "__main__":
main()
This diff is collapsed.
# NLP in diagnostic texts from nephropathology
This python project was created as part of the article "Natural Language Processing in diagnostic texts from
nephropathology" and will be uploaded soon (refactor works in progress).
\ No newline at end of file
nephropathology".
The paper can be found [here](LINK).
The scripts ```database_preparation/data_preparation_pipeline.py```, ```TextClustering/clustering_pipeline.py```
and ```TextClassification/classification_pipeline.py``` gives an idea of how this project can be used with other datasets.
The scripts ```TextClustering/basedOn_BOW/kmeans_Diagnosis.py```,
```TextClustering/basedOn_BOW/HDBSCAN_Diagnosis.py``` and ```TextClassification/bow_classification.py```
can also process tf-idf vectorized corpora.
All other scripts can only process corpora that are not vectorized.
Feel free to use and adapt the scripts to your own needs.
## Requirements
For preprocessing, the project requires some nltk corporas:
```
import nltk
nltk.download('stopwords')
nltk.download('punkt')
```
\ No newline at end of file
import numpy as np
import tensorflow_datasets as tfds
import tensorflow as tf
tfds.disable_progress_bar()
from matplotlib import pyplot as plt
import pandas as pd
import os
import sys
from TextClassification.argsparse_classification_preamble import argsparse_preamble
import classification_metrics as cls_metrics
import time
from database_preparation.utils_labeled_datasets import get_splits_for_cross_val
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import Flatten
from keras.layers.convolutional import Conv1D
from keras.layers.convolutional import MaxPooling1D
#from keras.layers.embeddings import Embedding
from tensorflow.keras.layers import Embedding
from keras_preprocessing.sequence import pad_sequences
from database_preparation.utils_labeled_datasets import text_label_files_to_labeled_dataset
from database_preparation.preprocess import print_meta_data
sys.path.append(os.getcwd())
def plot_graphs(history, metric):
plt.close()
plt.plot(history.history[metric])
plt.plot(history.history['val_'+metric])
plt.xlabel("Epochs")
plt.ylabel(metric)
plt.legend([metric, 'val_'+metric])
plt.show()
def save_graphs(history, metric, save_path):
plt.close()
plt.plot(history.history[metric])
plt.plot(history.history['val_' + metric])
plt.xlabel("Epochs")
plt.ylabel(metric)
plt.legend([metric, 'val_' + metric])
#plt.show()
plt.savefig(save_path, dpi=300)
print("generated "+save_path)
def dict2tf_dataset(dict):
return tf.data.TextLineDataset.from_tensor_slices(([text for text in dict['text']], dict['label']))
def train_test_updatemetrics(train_dataset, test_dataset, num_classes, metrics,
num_epochs=50, plot_loss=False,
plot_save_path="TextClassification/plots/CNN/CNN_loss.png"):
train_dataset = dict2tf_dataset(train_dataset)
test_dataset = dict2tf_dataset(test_dataset)
############### Create the text encoder ###################
VOCAB_SIZE = 5000
encoder = tf.keras.layers.TextVectorization(
max_tokens=VOCAB_SIZE)
encoder.adapt(train_dataset.map(lambda text, label: text))
vocab = np.array(encoder.get_vocabulary())
# encode data to word-indices:
X_train = []
y_train = []
X_test = []
y_test = []
for text, label in train_dataset:
X_train.append(encoder(text).numpy())
y_train.append(label)
for text, label in test_dataset:
X_test.append(encoder(text).numpy())
y_test.append(label)
X_train = np.asarray(X_train)
y_train = np.asarray(y_train)
X_test = np.asarray(X_test)
y_test = np.asarray(y_test)
##### create the model: #####
# Padding the data samples to a maximum review length in words
max_words = 450
X_train = pad_sequences(X_train, maxlen=max_words)
X_test = pad_sequences(X_test, maxlen=max_words)
# Building the CNN Model
embedding_dim = 100
filter_amount = 32
filter_size = 3
model = Sequential() # initilaizing the Sequential nature for CNN model
model.add(Embedding(len(encoder.get_vocabulary()), embedding_dim, input_length=max_words, mask_zero=True))
model.add(Conv1D(filter_amount, filter_size, padding='same', activation='relu'))
model.add(MaxPooling1D())
model.add(Flatten())
model.add(Dense(250, activation='relu'))
model.add(Dense(num_classes, activation='softmax'))
model.compile(loss='sparse_categorical_crossentropy',
optimizer=tf.keras.optimizers.Adam(1e-4),
metrics=["accuracy"])
# model.summary()
start = time.time()
# evaluate:
if plot_loss:
history = model.fit(X_train, y_train, epochs=num_epochs,
batch_size=128, verbose=2,
validation_data=(X_test, y_test),
validation_steps=1,
)
save_graphs(history, 'loss', plot_save_path)
else:
model.fit(X_train, y_train, epochs=num_epochs,
batch_size=128, verbose=2)
predictions = model.predict(X_test)
y_pred = np.argmax(predictions, axis=-1)
metrics.update_metrics(y_test, y_pred, True, start)
def main():
############# get labeled text data ###################
args = argsparse_preamble()
print("CNN Evaluation with corpus " + args.path2corpus + " and cluster set " + args.clustered_data)
print("infos about corpus:")
print_meta_data(args.path2corpus)
dataset = text_label_files_to_labeled_dataset(args.clustered_data, path2corpus=args.path2corpus)
num_classes = int(pd.DataFrame(dataset["label"]).nunique())
metrics = cls_metrics.ClassificationMetrics("CNN")
epochs = 100
folds = 10
for i, (train_dataset, test_dataset) in enumerate(get_splits_for_cross_val(dataset, folds)):
if args.loss_curve_check:
train_test_updatemetrics(train_dataset, test_dataset, num_classes, metrics,
epochs, plot_loss=True,
plot_save_path="TextClassification/plots/CNN/CNN_loss_"+str(i+1)+".png")
else:
print("====== CNN train/test run " + str(i+1) + "/" + str(folds) + " ======")
print(str(len(train_dataset["label"]))+" train documents")
print(str(len(test_dataset["label"])) + " test documents")
train_test_updatemetrics(train_dataset, test_dataset, num_classes, metrics, epochs)
if not args.loss_curve_check:
metrics.save_scores_to_disk(args.clustered_data)
metrics.pickle_object(args.clustered_data)
cls_metrics.print_results_as_latextable(metrics.json_file_path)
if __name__ == '__main__':
main()
\ No newline at end of file
import numpy as np
import tensorflow as tf
from matplotlib import pyplot as plt
import pandas as pd
import os
import sys
sys.path.append(os.getcwd())
from TextClassification.argsparse_classification_preamble import argsparse_preamble
import TextClassification.classification_metrics as cls_metrics
import time
from database_preparation.utils_labeled_datasets import get_splits_for_cross_val
from database_preparation.utils_labeled_datasets import text_label_files_to_labeled_dataset
from database_preparation.preprocess import print_meta_data
def save_graphs(history, metric, save_path):
plt.close()
plt.plot(history.history[metric])
plt.plot(history.history['val_' + metric])
plt.xlabel("Epochs")
plt.ylabel(metric)
plt.legend([metric, 'val_' + metric])
#plt.show()
plt.savefig(save_path, dpi=300)
print("generated "+save_path)
def dict2tf_dataset(dict):
return tf.data.TextLineDataset.from_tensor_slices(([text.lower() for text in dict['text']], dict['label']))
def train_test_updatemetrics(train_dataset, test_dataset, num_classes, metrics,
epochs=10, plot_loss=False,
plot_save_path="TextClassification/plots/RNN/RNN_loss.png"):
y_test = np.asarray(test_dataset['label'])
train_dataset = dict2tf_dataset(train_dataset)
test_dataset = dict2tf_dataset(test_dataset)
# Next shuffle the data for training and create batches of these (text, label) pairs:
BUFFER_SIZE = 10000
BATCH_SIZE = 64
train_dataset = train_dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)
test_dataset = test_dataset.batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)
for example, label in train_dataset.take(1):
'''print('text: ', example.numpy())
print('label: ', label.numpy())'''
pass
############### Create the text encoder ###################
VOCAB_SIZE = 5000
encoder = tf.keras.layers.TextVectorization(
max_tokens=VOCAB_SIZE)
encoder.adapt(train_dataset.map(lambda text, label: text))
vocab = np.array(encoder.get_vocabulary())
'''print("vocab info:")
print(vocab[:20])
print(len(encoder.get_vocabulary()))
encoded_example = encoder(example)[:3].numpy()
print(encoded_example)
for n in range(3):
print("Original: ", example[n].numpy())
print("Round-trip: ", " ".join(vocab[encoded_example[n]]))
print()'''
##### create the model: #####
embedding_dim = 64
model = tf.keras.Sequential([
encoder,
tf.keras.layers.Embedding(
input_dim=len(encoder.get_vocabulary()),
output_dim=embedding_dim,
# Use masking to handle the variable sequence lengths
mask_zero=True),
tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(embedding_dim)),
tf.keras.layers.Dense(embedding_dim, activation='relu'),
tf.keras.layers.Dense(num_classes, activation='softmax')
])
# model.summary()
# stacking 2 LSTM layers (seems to be much worse):
'''model = tf.keras.Sequential([
encoder,
tf.keras.layers.Embedding(len(encoder.get_vocabulary()), 64, mask_zero=True),
tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(64, return_sequences=True)),
tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(32)),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dropout(0.5),
tf.keras.layers.Dense(num_classes, activation='softmax')
])'''
# All the layers after the Embedding support masking:
# print([layer.supports_masking for layer in model.layers])
# predict on a sample text without padding.
'''print("test prediction:")
sample_text = ('The movie was cool. The animation and the graphics '
'were out of this world. I would recommend this movie.')
predictions = model.predict(np.array([sample_text]))
print(predictions)'''
model.compile(loss='sparse_categorical_crossentropy',
optimizer=tf.keras.optimizers.Adam(1e-4),
metrics=["accuracy"])
start = time.time()
# evaluate:
if plot_loss:
history = model.fit(train_dataset, epochs=epochs,
validation_data=test_dataset,
validation_steps=2)
test_loss, test_acc = model.evaluate(test_dataset)
print('Test Loss:', test_loss)
print('Test Accuracy:', test_acc)
#save_graphs(history, 'accuracy')
save_graphs(history, 'loss', plot_save_path)
else:
model.fit(train_dataset, epochs=epochs)
predictions = model.predict(test_dataset)
y_pred = np.argmax(predictions, axis=-1)
metrics.update_metrics(y_test, y_pred, True, start)
def main():
############# get labeled text data ###################
# how to convert words 2 ids with gensim:
# words = corpora.Dictionary(diag_lst)
args = argsparse_preamble()
print("RNN Evaluation with corpus " + args.path2corpus + " and cluster set " + args.clustered_data)
print("infos about corpus:")
print_meta_data(args.path2corpus.replace('.pkl', '_meta.pkl'))
# dataset = load_labeled_dataset(args.clustered_data)
dataset = text_label_files_to_labeled_dataset(args.clustered_data, path2corpus=args.path2corpus)
num_classes = int(pd.DataFrame(dataset["label"]).nunique())
metrics = cls_metrics.ClassificationMetrics("RNN")
folds = 10
epochs = 70
for i, (train_dataset, test_dataset) in enumerate(get_splits_for_cross_val(dataset, folds)):
if args.loss_curve_check:
train_test_updatemetrics(train_dataset, test_dataset, num_classes,
metrics, epochs=epochs, plot_loss=True,
plot_save_path="TextClassification/plots/RNN/RNN_loss_"+str(i+1)+".png")
else:
print("====== RNN train/test run " + str(i + 1) + "/" + str(folds) + " ======")
print(str(len(train_dataset["label"])) + " train documents")
print(str(len(test_dataset["label"])) + " test documents")
train_test_updatemetrics(train_dataset, test_dataset, num_classes, metrics, epochs=epochs)
if not args.loss_curve_check:
metrics.save_scores_to_disk(args.clustered_data)
metrics.pickle_object(args.clustered_data)
cls_metrics.print_results_as_latextable(metrics.json_file_path)
if __name__ == '__main__':
main()
import argparse
import sys, os
def argsparse_preamble():
sys.path.append(os.getcwd())
parser = argparse.ArgumentParser()
parser.add_argument("--overwrite", action='store_true')#False: generate data only if it doesn already exist
parser.add_argument("--show_figures", action='store_true')
parser.add_argument("--clustered_data", default="HDBSCAN")
parser.add_argument("--path2corpus", default="database/bow_prepro_desc.pkl")
parser.add_argument("--loss_curve_check", action='store_true')
args = parser.parse_args()
return args
\ No newline at end of file
This diff is collapsed.
# -*- coding: iso-8859-1 -*-
import os
from sklearn.naive_bayes import MultinomialNB
from sklearn.linear_model import LogisticRegression
from sklearn.linear_model import SGDClassifier # stochastic gradient descent (SGD) learning
from sklearn.neural_network import MLPClassifier
from sklearn.decomposition import TruncatedSVD
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.pipeline import Pipeline
from TextClassification.argsparse_classification_preamble import argsparse_preamble
import database_preparation.utils_labeled_datasets as dt
from database_preparation.utils_labeled_datasets import get_splits_for_cross_val
import TextClassification.classification_metrics as cls_metrics
from database_preparation.preprocess import print_meta_data
from database_preparation.utils_labeled_datasets import is_text_lst_tfidf_vectorized
import pickle
import numpy as np
import pandas as pd
'''from imblearn.under_sampling import RandomUnderSampler
from imblearn.over_sampling import RandomOverSampler
from imblearn.pipeline import make_pipeline as make_pipeline_imb'''
#expeeriment:
merge_classes = [(0, 1), (5, 7), (9, 10, 11), (6, 15)]
# for tfidf vectorizer
def identity(words):
return words
def create_pipeline(estimator, reduction=False, with_vectorizer=True):
'''
construct a pipeline with sklearn.pipeline
pased estimator will be the last element of the pipeline
using tfidf as vectorizer
'''
steps = []
if with_vectorizer:
steps.append(
('vectorize', TfidfVectorizer(tokenizer=identity, preprocessor=None, lowercase=False))
)
if reduction:
steps.append((
'reduction', TruncatedSVD(n_components=1000)
))
# Add the estimator
steps.append(('classifier', estimator))
return Pipeline(steps)
def get_immediate_subdirectories(a_dir):
return [name for name in os.listdir(a_dir)
if os.path.isdir(os.path.join(a_dir, name))]
def cross_validate_with_bow_classifiers(label_set, fold_amount=10,
path2corpus="./database/bow_prepro_desc.pkl",
df_cases_file="database/df_cases.pkl"):
'''
cross validates passed label_set with text data saved in path2corpus and labels saved in df_cases_file.
path2corpus should be a list of reports, where each report is tokenized
or a list of tf-idf vectorized texts (of type scipy.sparse.csr.csr_matrix).
'''
if is_text_lst_tfidf_vectorized(path2corpus):
models = []
models.append(create_pipeline(MultinomialNB(), with_vectorizer=False))
models.append(create_pipeline(MLPClassifier(max_iter=300), with_vectorizer=False))
models.append(create_pipeline(LogisticRegression(), with_vectorizer=False))
models.append(create_pipeline(SGDClassifier(), with_vectorizer=False))
with open(path2corpus, 'rb') as f:
loaded_texts = pickle.load(f)
n = np.asarray(loaded_texts.todense().tolist())
from database_preparation.utils_labeled_datasets import text_label_2_labeled_dataset
df_cases = pd.read_pickle(df_cases_file)
dataset = text_label_2_labeled_dataset(n, df_cases["label_" + label_set])
else:
# create model-pipelines for cross-validation with different pipelines:
models = []
models.append(create_pipeline(SGDClassifier()))
models.append(create_pipeline(MultinomialNB()))
models.append(create_pipeline(LogisticRegression()))
models.append(create_pipeline(MLPClassifier(max_iter=300)))
'''models.append(make_pipeline_imb(TfidfVectorizer(tokenizer=identity, preprocessor=None, lowercase=False)
, RandomOverSampler(), SGDClassifier()))'''
# print(f"train models {[model['classifier'] for model in models]} with corpus {path2corpus} and cluster set {label_set}")
print("infos about corpus:")
print_meta_data(path2corpus)
dataset = dt.text_label_files_to_labeled_dataset(label_set, path2corpus=path2corpus,
df_cases_path=df_cases_file)
# in order to use same 10-fold-cross-splits for each model:
k_train_test_sets = []
for (train_dataset, test_dataset) in get_splits_for_cross_val(dataset,
fold_amount, merge_classes=None, oversample=False, stratified=True):
k_train_test_sets.append(tuple((train_dataset, test_dataset)))
# cross validate each model and save metrics:
for model in models:
print('running ' + str(model['classifier']))
name = model.named_steps['classifier'].__class__.__name__
if 'reduction' in model.named_steps:
name += " (TruncatedSVD)"
metrics = cls_metrics.ClassificationMetrics(name)
for i, (train_dataset, test_dataset) in enumerate(k_train_test_sets):
model.fit(train_dataset['text'], train_dataset['label'])
y_pred = model.predict(test_dataset['text'])
metrics.update_metrics(test_dataset['label'], y_pred, False)
metrics.save_scores_to_disk(label_set)
metrics.pickle_object(label_set)
df = metrics.classes_scores(-1)
print(df.to_latex().replace('{}', 'cluster'))
cls_metrics.print_results_as_latextable(metrics.json_file_path)
def main():
args = argsparse_preamble()
cross_validate_with_bow_classifiers(args.clustered_data, path2corpus=args.path2corpus)
if __name__ == '__main__':
main()
# -*- coding: iso-8859-1 -*-
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import SGDClassifier
import sys
import database_preparation.utils_labeled_datasets as dt
# for training validation:
import TextClassification.classification_metrics as cls_metrics
from sklearn.decomposition import TruncatedSVD
import pandas as pd
import numpy as np
from sklearn.model_selection import KFold
from sklearn.svm import SVC
from sklearn.pipeline import Pipeline
import nltk
import datasets
import pyarrow as pa
import pickle
fold_amount = 10
#%%
# for tfidf vectorizer
def identity(words):
return words
def create_pipeline(estimator, reduction=False):
'''
construct a pipeline with sklearn.pipeline
pased estimator will be the last element of the pipeline
using tfidf as vectorizer
'''
steps = []
steps.append(
('vectorize', TfidfVectorizer(tokenizer=identity, preprocessor=None, lowercase=False))
)
if reduction:
steps.append((
'reduction', TruncatedSVD(n_components=1000)
))
# Add the estimator
steps.append(('classifier', estimator))
return Pipeline(steps)
def cross_validate_with_simple_SVM(label_set, path2corpus = "./database/bow_prepro_diag.pkl", path2dfcases='./database/df_cases.pkl'):
"""
trains a simple SVM with the given data
returns 10-fold-cross-validated accuracy value
"""
print(f"Calculating SVM-classification performance of {label_set} cluster-setr "
f"with text corpus {path2corpus}.")
metrics = cls_metrics.ClassificationMetrics(label_set)
#print("train SVM for cluster " + label_set + " with " + path2corpus + ".")
text_lst = pd.read_pickle(path2corpus)
text1 = np.asarray(text_lst[0])
corpus_is_tokenized = bool(text1.ndim)
del text1, text_lst
if corpus_is_tokenized:
dataset = dt.text_label_files_to_labeled_dataset(label_set,
path2corpus
, path2dfcases, False)
else:
dataset_raw = dt.text_label_files_to_labeled_dataset(label_set,
path2corpus
, path2dfcases, False)
# tokenize
tokenized_texts = []
for t_text in dataset_raw['text']:
tokenized_texts.append(nltk.tokenize.word_tokenize(t_text, language='german'))
df = pd.DataFrame({'text': tokenized_texts, 'label': dataset_raw['label']})
dataset = datasets.Dataset(pa.Table.from_pandas(df))
# 10-fold crosss validation:
folds = KFold(n_splits=10, shuffle=False)
for i, (train_index, test_index) in enumerate(folds.split(list(range(len(dataset))))):
train_dataset = dataset[train_index]
test_dataset = dataset[test_index]
pipe = create_pipeline(SGDClassifier())
pipe.fit(train_dataset['text'], train_dataset['label'])
y_pred = pipe.predict(test_dataset['text'])
metrics.update_metrics(test_dataset['label'], y_pred, False)
# train_save_SVM_for_clusterset_evaluation(label_set)
# metrics.save_scores_to_disk("diagnose_texts_with_SGD")
return metrics
def cross_validate_label_corpus_with_simple_SVM(labels, path2corpus = "./database/bow_prepro_diag.pkl", sample = True):
"""
trains a simple SVM with the given data
returns 10-fold-cross-validated accuracy value
"""
texts = pd.read_pickle(path2corpus)
from database_preparation.utils_labeled_datasets import text_label_2_labeled_dataset
metrics = cls_metrics.ClassificationMetrics("temp")
#print("train SVM for cluster " + label_set + " with " + path2corpus + ".")
text_lst = pd.read_pickle(path2corpus)
text1 = np.asarray(text_lst[0])
corpus_is_tokenized = bool(text1.ndim)
del text1, text_lst
if corpus_is_tokenized:
dataset = text_label_2_labeled_dataset(texts,labels)
else:
dataset_raw = text_label_2_labeled_dataset(texts,labels)
# tokenize
tokenized_texts = []
for t_text in dataset_raw['text']:
tokenized_texts.append(nltk.tokenize.word_tokenize(t_text, language='german'))
df = pd.DataFrame({'text': tokenized_texts, 'label': dataset_raw['label']})
dataset = datasets.Dataset(pa.Table.from_pandas(df))
# 10-fold crosss validation:
folds = KFold(n_splits=10, shuffle=False)
for i, (train_index, test_index) in enumerate(folds.split(list(range(len(dataset))))):
train_dataset = dataset[train_index]
test_dataset = dataset[test_index]
pipe = create_pipeline(SGDClassifier())
pipe.fit(train_dataset['text'], train_dataset['label'])
y_pred = pipe.predict(test_dataset['text'])
metrics.update_metrics(test_dataset['label'], y_pred, False)
if sample:
return metrics.scores['accuracy']
# train_save_SVM_for_clusterset_evaluation(label_set)
# metrics.save_scores_to_disk("diagnose_texts_with_SGD")
return np.mean(metrics.scores['accuracy'])
def train_SVM_with_clusterset(label_set, path2corpus = "./database/bow_prepro_diag.pkl", path2dfcases='./database/df_cases.pkl'):
"""
trains ans saves a svm, trained with the whole data under as:
"./ModelTestingAndExplaining/models/SVM_trained_with_" + label_set + "_clustered.pkl"
"""
print("train SVM for cluster " + label_set + " with " + path2corpus + ".")
text_lst = pd.read_pickle(path2corpus)
text1 = np.asarray(text_lst[0])
corpus_is_tokenized = bool(text1.ndim)
del text1, text_lst
if corpus_is_tokenized:
dataset = dt.text_label_files_to_labeled_dataset(label_set,
path2corpus
, path2dfcases, False)
else:
dataset_raw = dt.text_label_files_to_labeled_dataset(label_set,
path2corpus
, path2dfcases, False)
# tokenize
tokenized_texts = []
for t_text in dataset_raw['text']:
tokenized_texts.append(nltk.tokenize.word_tokenize(t_text, language='german'))
df = pd.DataFrame({'text': tokenized_texts, 'label': dataset_raw['label']})
dataset = datasets.Dataset(pa.Table.from_pandas(df))
pipe = create_pipeline(SVC(probability=True, kernel='linear'))
'''svd = TruncatedSVD(n_components=100, n_iter=7, random_state=42)
pipe = make_pipeline(make_pipeline(
TfidfVectorizer(tokenizer=identity, preprocessor=None, lowercase=False),svd),
SVC(C=150, gamma=2e-2, probability=True))'''
pipe.fit(dataset['text'], dataset['label'])
path = "./ModelTestingAndExplaining/models/SVM_trained_with_" + label_set + "_clustered.pkl"
pickle.dump(pipe, open(path, 'wb'))
def update_cls_metric(label_set, cls_accuracy):
file_name = label_set + "_Diagnosis"
file_name = file_name.replace('KMeans', 'kmeans')
file_name = file_name.replace('d2v', 'doc2vec')
file_path = "TextClustering/cluster_metrics/" + file_name + ".pkl"
try:
scores = pd.DataFrame(pd.read_pickle(file_path))
except:
return
if 'cls accuracy' in scores.index:
scores[file_name]['cls accuracy'] = cls_accuracy
new_scores = scores
else:
vals = list(scores[file_name])
new_index = scores.index.append(pd.Index(['cls accuracy']))
vals.append(cls_accuracy)
new_scores = pd.DataFrame({file_name: vals}, index=new_index)
new_scores.to_pickle(file_path)
def update_cls_metric_for_each_clusterset():
'''
does 10-fold-cross-validation with a svm for each cluster-set saved in './database/df_cases.pkl'
using always the text in 'database/diag_lst_tokenized.pkl'
'''
label_sets = dt.get_all_label_set_ids()
# label_sets = ["German_BERT"]
for label_set in label_sets:
accuracy = np.mean(cross_validate_with_simple_SVM(label_set,
'database/diag_lst_tokenized.pkl',
'./database/df_cases.pkl').scores['accuracy'])
print("svm-cls-accuracy of cluster set "+label_set+": "+str(accuracy))
update_cls_metric(label_set, accuracy)
def main():
#update_cls_metric_for_each_clusterset()
cluster_set_name = "German_BERT"
#text_data = 'database/darmischaemie_prostata_txt_lst.pkl' cluster_set_dict = './database/df_cases2.pkl'
text_data = 'database/diag_lst.pkl'
#text_data = 'database/diag_lst_tokenized.pkl'
cluster_set_dict = './database/df_cases.pkl'
train_SVM_with_clusterset(cluster_set_name, text_data, cluster_set_dict)
if __name__ == '__main__':
main()
This diff is collapsed.
import os
####### pieline parameters ################
#cluster_sets = ['KMeans', 'LDA', 'HDBSCAN', 'GSDPMM', 'German_BERT', 'Patho_BERT', 'top2vec']
cluster_sets = ['HDBSCAN']
# params:
path2corpus_bow_preprocessed = 'database/bow_prepro_desc.pkl'
path2corpus_embedding_preprocessed = 'database/embedding_prepro_desc.pkl'
#check working directory:
workdir = os.getcwd()
if not workdir[-len('nlp-in-diagnostic-texts-from-nephropathology'):] == 'nlp-in-diagnostic-texts-from-nephropathology':
print(workdir + " is the wrong working directory.")
print("please make shure to run this script with working directory '.../path/to/nlp-in-diagnostic-texts-from-nephropathology'.")
exit(1)
for cluster_set in cluster_sets:
script_queue = [
f"python TextClassification/bow_classification.py --clustered_data {cluster_set} --path2corpus {path2corpus_bow_preprocessed}",
f"python TextClassification/RNN_classification.py --clustered_data {cluster_set} --path2corpus {path2corpus_embedding_preprocessed}",
f"python TextClassification/CNN_classification.py --clustered_data {cluster_set} --path2corpus {path2corpus_embedding_preprocessed}",
#f"python TextClassification/bert_classification.py --clustered_data {cluster_set} --path2corpus {path2corpus_embedding_preprocessed}",
f"python TextClassification/print_classification_metrics.py --clustered_data {cluster_set}"
]
for script in script_queue:
print("\n########################################### executing ###########################################")
print(script)
print("####################################################################################################\n")
os.system(script)
\ No newline at end of file
import matplotlib.pyplot as plt
import math
import json
import argparse
def plot_loss_curve(path2json, title='loss'):
with open(path2json) as f:
log_history = json.load(f)["log_history"]
# Keep track of train and evaluate loss.
loss_history = {'train_loss': [], 'eval_loss': [],
'train_steps': [], 'train_epochs': [],
'eval_steps': [], 'eval_epochs': []}
# Keep track of train and evaluate perplexity.
# This is a metric useful to track for language models.
perplexity_history = {'train_perplexity': [], 'eval_perplexity': []}
for log in log_history:
if 'loss' in log.keys():
# Deal with trianing loss.
loss_history['train_loss'].append(log['loss'])
perplexity_history['train_perplexity'].append(math.exp(log['loss']))
loss_history['train_epochs'].append(log["epoch"])
loss_history['train_steps'].append(log["step"])
elif 'eval_loss' in log.keys():
# Deal with eval loss.
loss_history['eval_loss'].append(log['eval_loss'])
perplexity_history['eval_perplexity'].append(math.exp(log['eval_loss']))
loss_history['eval_epochs'].append(log["epoch"])
loss_history['eval_steps'].append(log["step"])
# Plot Losses.
plt.figure()
plt.plot(loss_history['eval_epochs'], loss_history["eval_loss"],
label="eval loss")
plt.plot(loss_history['train_epochs'], loss_history["train_loss"],
label="train loss")
plt.xlabel("epoch", fontsize=14)
plt.ylabel("loss", fontsize=14)
plt.title(title, fontsize=16)
plt.grid(True)
plt.legend()
plt.show()
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--path_to_trainer_state_file",
default='./LanguageModelling/ger-patho-bert-w3/trainer_state.json')
args = parser.parse_args()
# example how to plot loss curve:
plot_loss_curve(args.path_to_trainer_state_file,
args.path_to_trainer_state_file.replace('/trainer_state.json',''))
if __name__ == '__main__':
main()
\ No newline at end of file
import TextClassification.classification_metrics as cls_metrics
import glob
import sys, os
from TextClassification.argsparse_classification_preamble import argsparse_preamble
import pickle
import database_preparation.utils_labeled_datasets as dt
sys.path.append(os.getcwd())
# script parameters:
metrics_folder = "cls_metrics/metrics_new"
def generate_save_conf_matrix(model_name, clustered_data,
test_set_index):
ob_dir = "./TextClassification/" + metrics_folder + "/" \
+ clustered_data + "_clustered_" + model_name + "_classified.pickle"
try:
with open(ob_dir, 'rb') as f:
metrics = pickle.load(f)
except FileNotFoundError:
return False
# plot confusion matrix
if "ger-patho-bert" in model_name:
titlename = "Patho-BERT"
elif "german" in model_name:
titlename = "German-BERT"
elif "SGD" in model_name:
titlename = "SGD-classifier"
elif "MLP" in model_name:
titlename = "MLP-classifier"
else:
titlename = model_name
unique_labels = dt.get_amount_unique_labels(clustered_data)
labels = [a for a in range(unique_labels)]
# labels=['class'+str(a) for a in range(unique_labels)]
# https://matplotlib.org/3.5.1/tutorials/colors/colormaps.html
metrics.plot_confusion_matrix(labels, prediction_set=test_set_index,
plot=False, save=True,
filename="confmatrx_"+clustered_data + "_clustered_" + model_name + "_classified",
title= titlename, normalized=True, annot = False, colormap='gist_heat')
return True
def print_f1_per_clusters(model_name, clustered_data):
# print f1-scores for each class of a single test run
ob_dir = "./TextClassification/" + metrics_folder + "/" \
+ clustered_data + "_clustered_" + model_name + "_classified.pickle"
try:
with open(ob_dir, 'rb') as f:
metrics = pickle.load(f)
except FileNotFoundError:
return False
print("================ model: " + model_name + " | cluster-set: " + clustered_data + " ================")
df = metrics.classes_scores(-1)
print(df.to_latex().replace('{}', 'cluster'))
return True
def main():
args = argsparse_preamble()
model_names = ['SGDClassifier', 'MLPClassifier', 'ger-patho-bert-2', 'bert-base-german-cased',
'CNN', 'RNN', 'LogisticRegression', 'MultinomialNB']
# print f1 scores for each classification model:
for model in model_names:
print_f1_per_clusters(model, args.clustered_data)
# print classification overview tables for each clustering method:
print()
file_list = glob.glob("./TextClassification/" + metrics_folder + "/" + '/*.json')
for file in file_list:
cls_metrics.print_results_as_latextable(file, True)
print()
# generate all confusion matrices for each classification model:
for model in model_names:
try:
generate_save_conf_matrix(model, args.clustered_data, -1)
except:
print(f"cant generate conf matrix for {model}")
if __name__ == '__main__':
main()
\ No newline at end of file
import argparse
import sys, os
def argsparse_preamble():
sys.path.append(os.getcwd())
parser = argparse.ArgumentParser()
parser.add_argument("--find_k_value", action='store_true')
parser.add_argument("--k_value", type=int, default=10)
parser.add_argument("--show_figures", action='store_true')
parser.add_argument("--model2use", default="German_BERT")
parser.add_argument('--do_embedding', action='store_true')
parser.add_argument("--path2corpus", default='database/bow_prepro_diag.pkl')
parser.add_argument("--df_cases_file", default='database/df_cases.pkl')
args = parser.parse_args()
return args
#%%
from __future__ import unicode_literals, print_function, division
import csv
import numpy as np
class GSDPMM:
def __init__(self, K, alpha, beta, iterNum, dataset):
self.K=K
self.alpha=alpha
self.beta=beta
self.iterNum=iterNum
self.dataset=dataset
self.docu_set=docu_set(self.dataset)
self.docu_num=self.docu_set.docu_num
self.V=self.docu_set.V
self.alpha0=K*self.alpha
self.beta0=self.V*beta
self.m_z=np.zeros(K,dtype=np.int)
self.n_z=np.zeros(K,dtype=np.int)
self.n_zv=np.zeros([K,self.V],dtype=np.int)
self.z_c=np.zeros(self.docu_num,dtype=np.int)
self.num_list=self.docu_set.num_list
self.wordid_array=self.docu_set.wordid_array
self.wordfreq_array=self.docu_set.wordfreq_array
self.largedouble=1e100
self.smalldouble=1e-100
def initialize(self):
for d in range(self.docu_num):
self.z_c[d]=int(np.floor(self.K*np.random.uniform()))
cluster=self.z_c[d]
self.m_z[cluster]=self.m_z[cluster]+1
for w in range(len(self.num_list[d])):
self.n_zv[cluster][self.num_list[d][w]]=self.n_zv[cluster][self.num_list[d][w]]+1
self.n_z[cluster]=self.n_z[cluster]+1
def gibbs_sampling(self):
for i in range(self.iterNum):
for d in range(self.docu_num):
cluster=self.z_c[d]
self.m_z[cluster]=self.m_z[cluster]-1
for w in range(len(self.num_list[d])):
self.n_zv[cluster][self.num_list[d][w]]=self.n_zv[cluster][self.num_list[d][w]]-1
self.n_z[cluster]=self.n_z[cluster]-1
cluster=self.sample_cluster(d)
self.z_c[d]=cluster
self.m_z[cluster]=self.m_z[cluster]+1
for w in range(len(self.num_list[d])):
self.n_zv[cluster][self.num_list[d][w]]=self.n_zv[cluster][self.num_list[d][w]]+1
self.n_z[cluster]=self.n_z[cluster]+1
print(f'iteration {i}/{self.iterNum}')
def sample_cluster(self, d):
prob=np.zeros(self.K)
overflow_count=np.zeros(self.K)
for k in range(self.K):
prob[k]=(self.m_z[k]+self.alpha)/(self.docu_num+self.alpha0)
value2=1.0
i=0
for w in range(len(self.wordid_array[d])):
wordNo=self.wordid_array[d][w]
wordfreq=self.wordfreq_array[d][w]
for j in range(wordfreq):
value2=value2*(self.n_zv[k][wordNo]+self.beta+j)/(self.n_z[k]+self.beta0+i)
i=i+1
if value2<self.smalldouble:
overflow_count[k]=overflow_count[k]-1
value2=value2*self.largedouble
prob[k]=prob[k]*value2
self.recompute_prob(prob, overflow_count, self.K)
for k in range(1,self.K):
prob[k]=prob[k-1]+prob[k]
sample=np.random.uniform()*prob[self.K-1]
kchoosed=0
for kchoosed in range(self.K):
if sample<prob[kchoosed]:
break
return kchoosed
def recompute_prob(self, prob, overflow_count, K):
max_common=-1e20
for k in range(K):
if overflow_count[k]>max_common and prob[k]>0:
max_common=overflow_count[k]
for k in range(K):
if prob[k]>0:
prob[k]=prob[k]*pow(self.largedouble,overflow_count[k]-max_common)
class docu_set:
def __init__(self, dataset):
self.docu_num=0
self.docs=[]
self.result=self.read_data(dataset)
self.lines=self.result[0]
self.wordtoId={}
self.wordfreq={}
self.V=len(self.wordtoId)
self.num_list, self.wordid_array, self.wordfreq_array=self.convert_to_numlist()
def read_data(self,filename):
data=[]
target=[]
with open(filename,'r') as csvfile:
line_reader=csv.reader(csvfile)
for line in line_reader:
data.append(line[2])
#target.append(line[3])
self.docu_num=len(data)
print(len(data))
return [data,target]
def convert_to_numlist(self):
n_lines=len(self.lines)
num_list=[[] for i in range(n_lines)]
wordid_array=[[] for i in range(n_lines)]
wordfreq_array=[[] for i in range(n_lines)]
for i in range(n_lines):
this_line=self.lines[i]
split_line=this_line.split()
for j in range(len(split_line)):
if split_line[j] in self.wordtoId:
self.wordfreq[self.wordtoId[split_line[j]]]=self.wordfreq[self.wordtoId[split_line[j]]]+1
Id=self.wordtoId.get(split_line[j])
if Id in wordid_array[i]:
wordfreq_array[i][wordid_array[i].index(Id)]+=1
else:
wordid_array[i].append(Id)
wordfreq_array[i].append(1)
else:
self.wordtoId[split_line[j]]=self.V
self.V=self.V+1
self.wordfreq[self.wordtoId[split_line[j]]]=1
Id=self.wordtoId.get(split_line[j])
if Id in wordid_array[i]:
wordfreq_array[i][wordid_array[i].index(Id)]+=1
else:
wordid_array[i].append(Id)
wordfreq_array[i].append(1)
num_list[i].append(self.wordtoId[split_line[j]])
return num_list, wordid_array, wordfreq_array
#%% argsparse section
import sys, os
sys.path.append(os.getcwd())
from TextClustering.argsparse_clustering_preamble import argsparse_preamble
args = argsparse_preamble()
from database_preparation.utils_labeled_datasets import is_text_lst_tokenized
#%% import section
import pickle
from TextClustering.basedOn_BOW.GSDPMM import *
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
import matplotlib.pyplot as plt
import umap
from tqdm import tqdm
from TextClustering.utils_metrics import ClusterMetrics
from database_preparation.preprocess import print_meta_data
from TextClassification.classification_for_cluster_evaluation import cross_validate_label_corpus_with_simple_SVM
#%% load the data
with open(args.path2corpus, 'rb') as f:
diag_lst = pickle.load(f)
print_meta_data(args.path2corpus)
#%% and save it for DPMM
text = ['text'] * len(diag_lst)
if is_text_lst_tokenized(args.path2corpus):
text_tupls = list(zip(text, [' '.join(tokenized_text) for tokenized_text in diag_lst]))
else:
text_tupls = list(zip(text, [text for text in diag_lst]))
df = pd.DataFrame(text_tupls)
df.to_csv('TextClustering/basedOn_BOW/temp.csv', header=None)
def identity(word):
return word
def create_vectorizer(data):
vec = TfidfVectorizer(tokenizer=identity, preprocessor=None, lowercase=False)
return vec.fit_transform(data)
text_features = create_vectorizer(diag_lst)
#%% find the best hyperparameter
if args.find_k_value:
# %% set the parameter
args.alpha = 0.3
args.beta = 0.02
args.iterNum = 5
args.dataset = 'TextClustering/basedOn_BOW/temp.csv'
beta_list = np.arange(3,23,1)
s_score, n_cluster, svm_scores = [], [], []
n_steps = []
for i in tqdm(beta_list):
#%% initialize it
gsdmm = GSDPMM(i,
args.alpha, args.beta,
args.iterNum,
args.dataset)
gsdmm.initialize()
# %% actually do it
gsdmm.gibbs_sampling()
#%% evalute the model
evaluation = ClusterMetrics(text_features, gsdmm.z_c)
s_score.append(evaluation.s_score)
svm_scores.append(
cross_validate_label_corpus_with_simple_SVM(gsdmm.z_c, args.path2corpus + '.pkl',
False))
n_cluster.append(len(np.unique(gsdmm.z_c)))
n_steps.append(i)
#%% plot it
fig, ax1 = plt.subplots()
ax2 = ax1.twinx()
ax3 = ax1.twinx()
ax1.plot(n_steps, s_score, 'bx-')
ax2.plot(n_steps, n_cluster, 'rx-')
ax3.plot(n_steps, svm_scores, 'gx-')
ax1.set_xlabel('Minimal cluster size')
ax1.yaxis.label.set_color('blue')
ax1.set_ylabel('s-score')
ax2.yaxis.label.set_color('red')
ax2.set_ylabel('Number of clusters')
ax3.yaxis.label.set_color('green')
ax3.set_ylabel('svm accuracy')
plt.title('Ellbow-method-like plot')
plt.savefig("TextClustering/plots/elbow_method/GSDPMM_elbow_plot.png", dpi=300)
plt.show()
sys.exit()
#%% set the parameter
args.alpha = 0.3
args.beta = 0.02
args.iterNum = 5
args.dataset = 'TextClustering/basedOn_BOW/temp.csv'
gsdmm=GSDPMM(args.k_value,
args.alpha, args.beta,
args.iterNum,
args.dataset)
gsdmm.initialize()
#%% actually do it
gsdmm.gibbs_sampling()
#%% retrieve the results
A=gsdmm.z_c
num_list=gsdmm.num_list
m_z=gsdmm.m_z
n_z=gsdmm.n_z
n_zv=gsdmm.n_zv
docu_num=gsdmm.docu_num
predictedCluster=gsdmm.z_c
wordid_array=gsdmm.wordid_array
wordfreq_array=gsdmm.wordfreq_array
#%% save UMAP data points:
umap_text_features2D = umap.UMAP(n_neighbors=15,
n_components=2,
min_dist=0.0, metric='cosine').fit_transform(text_features)
# save umaped vectors and labels:
df = pd.read_pickle(args.df_cases_file)
df['umapX_GSDPMM'] = umap_text_features2D[:, 0]
df['umapY_GSDPMM'] = umap_text_features2D[:, 1]
df['label_GSDPMM'] = predictedCluster
df.to_pickle(args.df_cases_file)
#%% evalute the model
from TextClustering.utils_metrics import ClusterMetrics
evaluation = ClusterMetrics(text_features, predictedCluster,
file_name= "TextClustering/cluster_metrics/GSDPMM_metrics.pkl")
evaluation.write_to_file()
#%% argsparse section
import sys, os
sys.path.append(os.getcwd())
from TextClustering.argsparse_clustering_preamble import argsparse_preamble
args = argsparse_preamble()
from database_preparation.utils_labeled_datasets import is_text_lst_tokenized, is_text_lst_tfidf_vectorized
import pickle
from sklearn.feature_extraction.text import TfidfVectorizer
import matplotlib.pyplot as plt
import pandas as pd
import umap
import hdbscan
import numpy as np
from nltk import RegexpTokenizer
from TextClustering.utils_metrics import ClusterMetrics
from database_preparation.preprocess import print_meta_data
from TextClassification.classification_for_cluster_evaluation import cross_validate_label_corpus_with_simple_SVM
tokenizer = RegexpTokenizer(r'\w+')
#%% load the data
with open(args.path2corpus, 'rb') as f:
diag_lst = pickle.load(f)
print_meta_data(args.path2corpus)
def identity(word):
return word
text_is_vectorized = is_text_lst_tfidf_vectorized(args.path2corpus)
if not (is_text_lst_tokenized(args.path2corpus) or text_is_vectorized):
print("Error: " + args.path2corpus + ' has to be a list of tokenized texts or tfidf-vectorized texts!')
exit(1)
def create_vectorizer(data):
vec = TfidfVectorizer(tokenizer=identity, preprocessor=None, lowercase=False)
vec = vec.fit_transform(data)
return vec
if text_is_vectorized:
text_features = diag_lst
else:
text_features = create_vectorizer(diag_lst)
#%% perform umap for dimension-reduction (for cluster-detection)
umap_text_features = umap.UMAP(n_neighbors=15,
n_components=5,
metric='cosine').fit_transform(text_features)
# and perform umap-dimension-reduction for visulatizaton
umap_text_features2D = umap.UMAP(n_neighbors=15,
n_components=2,
min_dist=0.0, metric='cosine').fit_transform(text_features)
if args.find_k_value:
# %% perform hdbscan for cluster-dectection with different cluster sizes to find a good solution...by eye..
list_cluster_size = [int(k) for k in np.arange(3, 23, 1)]
s_score, n_cluster, svm_scores = [], [], []
for i_cluster_size in list_cluster_size:
cluster = hdbscan.HDBSCAN(min_cluster_size=i_cluster_size,
metric='euclidean',
cluster_selection_method='eom').fit(umap_text_features)
result = pd.DataFrame(umap_text_features2D, columns=['x', 'y'])
result['labels'] = cluster.labels_.tolist() # cluster.labels_
print(np.unique(result.labels))
#%% Visualize clusters
outliers = result.loc[result.labels == -1, :]
clustered = result.loc[result.labels != -1, :]
clustered['labels'] = [str(i) for i in clustered['labels']]
evaluation = ClusterMetrics(umap_text_features, cluster.labels_.tolist())
s_score.append(evaluation.s_score)
svm_scores.append(
cross_validate_label_corpus_with_simple_SVM(cluster.labels_.tolist(), args.path2corpus + '.pkl',
False))
n_cluster.append(len(np.unique(cluster.labels_.tolist())))
fig, ax1 = plt.subplots()
ax2 = ax1.twinx()
ax3 = ax1.twinx()
ax1.plot(list_cluster_size, s_score, 'bx-')
ax2.plot(list_cluster_size, n_cluster, 'rx-')
ax3.plot(list_cluster_size, svm_scores, 'gx-')
ax1.set_xlabel('Minimal cluster size')
ax1.yaxis.label.set_color('blue')
ax1.set_ylabel('s-score')
ax2.yaxis.label.set_color('red')
ax2.set_ylabel('Number of clusters')
ax3.yaxis.label.set_color('green')
ax3.set_ylabel('svm accuracy')
plt.title('Ellbow-method-like plot')
plt.savefig("TextClustering/plots/elbow_method/HDBSCAN_elbow_plot.png", dpi=300)
plt.show()
exit()
#%% perform hdbscan with best cluster size
cluster = hdbscan.HDBSCAN(min_cluster_size=args.k_value,
metric='euclidean',
cluster_selection_method='eom').fit(umap_text_features)
result = pd.DataFrame(umap_text_features2D, columns=['x', 'y'])
result['labels'] = cluster.labels_.tolist() # cluster.labels_
clusters = np.int8([str(i) for i in result['labels']])
outliers = result.loc[result.labels == -1, :]
clusters_no_outliers = result.loc[result.labels != -1, :]
unique_clusters = np.unique(result.labels)
print(f"\nfound {len(unique_clusters[unique_clusters>-1])} clusters.\n")
# save umaped vectors:
df = pd.read_pickle(args.df_cases_file)
df['umapX_HDBSCAN'] = umap_text_features2D[:, 0]
df['umapY_HDBSCAN'] = umap_text_features2D[:, 1]
df['label_HDBSCAN'] = clusters
df.to_pickle(args.df_cases_file)
#%% and evaluate the results with several metrics (not needing ground truth)
evaluation = ClusterMetrics(umap_text_features[result.labels >= 0,], clusters_no_outliers.labels.tolist(),
file_name= "TextClustering/cluster_metrics/HDBSCAN_metrics.pkl")
evaluation.write_to_file()
#%% argsparse section
import sys, os
sys.path.append(os.getcwd())
from TextClustering.argsparse_clustering_preamble import argsparse_preamble
args = argsparse_preamble()
from database_preparation.utils_labeled_datasets import is_text_lst_tokenized
if not is_text_lst_tokenized(args.path2corpus):
print("Error: "+args.path2corpus + '.pkl is not tokenized! '
'Please pass texts list where each text is tokenized (a list of words).')
exit(1)
#%% import section
import pickle
import gensim
import gensim.corpora as corpora
from gensim.models import CoherenceModel
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from gensim.models import CoherenceModel
from tqdm import tqdm
from database_preparation.preprocess import print_meta_data
from TextClassification.classification_for_cluster_evaluation import cross_validate_label_corpus_with_simple_SVM
#%% load the diag and main_diag list
with open(args.path2corpus, 'rb') as f:
diag_lst = pickle.load(f)
print_meta_data(args.path2corpus)
#%% prepare database_preparation for LDA-model-trainng
# Creates, which is a mapping of word IDs to words.
words = corpora.Dictionary(diag_lst)
# Turns each document into a bag of words.
corpus = [words.doc2bow(doc) for doc in diag_lst] #is that allready a model
#%% train LDA-model with different number of clusters
if args.find_k_value:
limit=21; start=5; step=1
coherence_values = []
model_list, n_cluster, svm_scores = [], [], []
for num_topics in tqdm(range(start, limit, step)):
lda_model = gensim.models.ldamodel.LdaModel(corpus=corpus,
id2word=words,
num_topics=num_topics,
random_state=5,
update_every=1,
passes=10,
alpha='auto',
per_word_topics=True)
coherencemodel = CoherenceModel(model=lda_model, texts=diag_lst, dictionary=words,
coherence='c_v', processes= 1)
coherence_values.append(coherencemodel.get_coherence())
topic_weights = []
for i, row_list in enumerate(lda_model[corpus]):
topic_weights.append([w for i, w in row_list[0]])
predictedCluster = np.argmax(pd.DataFrame(topic_weights).fillna(0).values, axis=1)
svm_scores.append(
cross_validate_label_corpus_with_simple_SVM(predictedCluster, args.path2corpus,
False))
#n_cluster.append(len(lda_model.print_topics(num_words=3)))
n_cluster.append(len(np.unique(np.asarray(predictedCluster))))
print("coherence: " + str(coherencemodel.get_coherence()))
#%% visualize the results
x = range(start, limit, step)
fig, ax1 = plt.subplots()
ax2 = ax1.twinx()
ax3 = ax1.twinx()
ax1.plot(x, coherence_values,'bx-')
ax2.plot(x, n_cluster, 'rx-')
ax3.plot(x, svm_scores, 'gx-')
ax1.set_xlabel('Minimal cluster size')
ax1.yaxis.label.set_color('blue')
ax1.set_ylabel('Coherence score')
ax2.yaxis.label.set_color('red')
ax2.set_ylabel('Number of clusters')
ax3.yaxis.label.set_color('green')
ax3.set_ylabel('svm accuracy')
plt.title('Ellbow-method-like plot')
plt.savefig("TextClustering/plots/elbow_method/LDA_elbow_plot.png", dpi=300)
plt.show()
exit()
#%% train LDA-model
lda_model = gensim.models.ldamodel.LdaModel(corpus=corpus,
id2word=words,
num_topics=args.k_value,
random_state=5,
update_every=1,
passes=10,
alpha='auto',
per_word_topics=True)
#%% get topic weights / features
topic_weights = []
for i, row_list in enumerate(lda_model[corpus]):
topic_weights.append([w for i, w in row_list[0]])
# Array of topic weights
text_features = pd.DataFrame(topic_weights).fillna(0).values
#%% get prediction
predictedCluster= np.argmax(text_features, axis=1)
# and add it to the dataframe
df = pd.read_pickle(args.df_cases_file)
df['label_LDA'] = predictedCluster
from sklearn.decomposition import PCA
pca = PCA(n_components=2)
reduced_features = pca.fit_transform(text_features)
df['pcaX_LDA'] = reduced_features[:, 0]
df['pcaY_LDA'] = reduced_features[:, 1]
#%% and with umap
import umap
umap_text_features2D = umap.UMAP(n_neighbors=15,
n_components=2,
min_dist=0.0, metric='cosine').fit_transform(text_features)
df['umapX_LDA'] = umap_text_features2D[:, 0]
df['umapY_LDA'] = umap_text_features2D[:, 1]
df.to_pickle(args.df_cases_file)
#%% evalute the model
from TextClustering.utils_metrics import ClusterMetrics
evaluation = ClusterMetrics(text_features, predictedCluster,
file_name= "TextClustering/cluster_metrics/LDA_metrics.pkl")
evaluation.write_to_file()
# %% import section
import pickle
from sklearn.feature_extraction.text import TfidfVectorizer
import matplotlib.pyplot as plt
from sklearn.cluster import KMeans
import pandas as pd
from database_preparation.preprocess import print_meta_data
from TextClassification.classification_for_cluster_evaluation import cross_validate_label_corpus_with_simple_SVM
import umap
from database_preparation.utils_stringpreparation import get_most_frequent_words
import numpy as np
from database_preparation.utils_labeled_datasets import is_text_lst_tokenized, is_text_lst_tfidf_vectorized
from TextClustering.argsparse_clustering_preamble import argsparse_preamble
import os
args = argsparse_preamble()
plot_real_diagnosis = False
def identity(word):
return word
# %% load the data
with open(args.path2corpus, 'rb') as f:
diag_lst = pickle.load(f)
text_is_vectorized = is_text_lst_tfidf_vectorized(args.path2corpus)
if not (is_text_lst_tokenized(args.path2corpus) or text_is_vectorized):
print("Error: " + args.path2corpus + ' has to be a list of tokenized texts or tfidf-vectorized texts!')
exit(1)
print_meta_data(args.path2corpus)
def create_vectorizer(data):
vec = TfidfVectorizer(tokenizer=identity, preprocessor=None, lowercase=False)
vec = vec.fit_transform(data)
return vec
if text_is_vectorized:
text_features = diag_lst
else:
text_features = create_vectorizer(diag_lst)
# %% perform elbow-method to find good cluster number
if args.find_k_value:
Sum_of_squared_distances, svm_values = [], []
K = range(2, 23, 1)
for k in K:
print("iteration #" + str(k))
km = KMeans(n_clusters=k, max_iter=200, n_init=10)
km = km.fit(text_features)
predictedCluster_text_features = km.predict(text_features)
Sum_of_squared_distances.append(km.inertia_)
svm_values.append(
cross_validate_label_corpus_with_simple_SVM(predictedCluster_text_features, args.path2corpus, False))
fig, ax1 = plt.subplots()
#ax2 = ax1.twinx()
ax3 = ax1.twinx()
ax1.plot(K, Sum_of_squared_distances, 'bx-')
#ax2.plot(K, svm_values, 'rx-')
ax3.plot(K, svm_values, 'gx-')
ax1.set_xlabel('K')
ax1.yaxis.label.set_color('blue')
ax1.set_ylabel('Sum_of_squared_distances')
#ax2.yaxis.label.set_color('red')
#ax2.set_ylabel('Number of clusters')
ax3.yaxis.label.set_color('green')
ax3.set_ylabel('svm accuracy')
plt.title('Ellbow-method-like plot')
plt.savefig("TextClustering/plots/elbow_method/KMeans_elbow_plot.png", dpi=300)
plt.show()
exit()
km = KMeans(n_clusters=args.k_value, max_iter=200, n_init=10)
km = km.fit(text_features)
predictedCluster_text_features = km.predict(text_features)
umap_text_features2D = umap.UMAP(n_neighbors=15,
n_components=2,
min_dist=0.0, metric='cosine').fit_transform(text_features)
# save umaped vectors:
df = pd.read_pickle(args.df_cases_file)
df['umapX_KMeans'] = umap_text_features2D[:, 0]
df['umapY_KMeans'] = umap_text_features2D[:, 1]
df['label_KMeans'] = predictedCluster_text_features
df.to_pickle(args.df_cases_file)
clusters = km.labels_.tolist()
docs = {'text': diag_lst, 'cluster': clusters}
# %% generate topic words with GT:
if not text_is_vectorized:
frame = pd.DataFrame(docs, index=[clusters])
clusters = []
word_list_GT = []
n_words = 10
for cluster in range(0, args.k_value):
t_frame = frame[frame['cluster'] == cluster]
all_text = " ".join(t_frame['text'].astype(str))
top_words = get_most_frequent_words(all_text, n_words)
clusters.append(cluster)
word_list_GT.append(top_words)
for i in range(0, len(word_list_GT)):
t_token = np.array(word_list_GT[i])
if len(t_token) < n_words + 1:
t_token = np.append(t_token, np.repeat(np.nan, n_words - len(t_token)))
t_token = t_token.reshape((1, -1))
if i == 0:
token_list = t_token
else:
token_list = np.concatenate((token_list, t_token), axis=0)
pd.DataFrame(token_list).to_excel('TextClustering/tables/WordsPerCluster_kmeans.xlsx',
sheet_name="GT for kmeans")
# %% evalute the model by clustering metrics
from TextClustering.utils_metrics import ClusterMetrics
evaluation = ClusterMetrics(text_features, km.labels_.tolist(),
file_name="TextClustering/cluster_metrics/KMeans_metrics.pkl")
evaluation.write_to_file()
# %% argsparse preamble
import sys, os
sys.path.append(os.getcwd())
from TextClustering.argsparse_clustering_preamble import argsparse_preamble
args = argsparse_preamble()
from database_preparation.utils_labeled_datasets import is_text_lst_tokenized
if is_text_lst_tokenized(args.path2corpus):
print("Error: " + args.path2corpus + ' is tokenized! '
'Please pass texts list where each text is a single string!')
exit(1)
# %% prepare the background
import pickle
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import umap
import hdbscan
from TextClustering.utils_metrics import ClusterMetrics
from sentence_transformers import SentenceTransformer
from database_preparation.preprocess import print_meta_data
embedding_backup_folder = "database/backup_files/"
if not os.path.isdir(embedding_backup_folder):
os.makedirs(embedding_backup_folder)
path_2_pathoBERT = "./LanguageModelling/ger-patho-bert-2"
with open(args.path2corpus, 'rb') as f:
diag_lst = pickle.load(f)
print_meta_data(args.path2corpus)
if args.do_embedding:
# %% load the model
if args.model2use == "German_BERT":
model = SentenceTransformer("Sahajtomar/German-semantic")
elif args.model2use == "Patho_BERT":
model = SentenceTransformer(path_2_pathoBERT)
# %% and apply the embedding-model to the text (only once, since very time-consuming)
if not 'embeddings' in locals():
embeddings = model.encode(diag_lst, show_progress_bar=True)
np.save(embedding_backup_folder + args.model2use + "_embeddingsBackup.npy", embeddings)
# %% load it (if not there)
if not 'embeddings' in locals():
embeddings = np.load(embedding_backup_folder + args.model2use + "_embeddingsBackup.npy")
# %% perform umap
umap_embeddings = umap.UMAP(n_neighbors=15,
n_components=5,
metric='cosine').fit_transform(embeddings)
# and perform umap-dimension-reduction for visulatizaton
umap_text_features2D = umap.UMAP(n_neighbors=15,
n_components=2,
min_dist=0.0, metric='cosine').fit_transform(embeddings)
if 'umap_embeddings' in locals():
np.save(embedding_backup_folder + args.model2use + "_umap_embeddingsBackup.npy", umap_embeddings)
# %% perform repetitive clustering to find the best min_cluster:size
if not 'umap_embeddings' in locals():
umap_embeddings = np.load(embedding_backup_folder + "umap_embeddingsBackup.npy")
if args.find_k_value:
cluster_size = range(5, 40, 2)
s_score, n_cluster = [], []
for i_cluster_size in cluster_size:
cluster = hdbscan.HDBSCAN(min_cluster_size=i_cluster_size,
metric='euclidean',
cluster_selection_method='eom').fit(umap_embeddings)
# and evaluate the results with several metrics (not needing ground truth)
from TextClustering.utils_metrics import ClusterMetrics
evaluation = ClusterMetrics(umap_embeddings, cluster.labels_.tolist())
s_score.append(evaluation.s_score)
n_cluster.append(len(np.unique(cluster.labels_.tolist())))
fig, ax1 = plt.subplots()
ax2 = ax1.twinx()
ax1.plot(cluster_size, s_score, 'bx-')
ax2.plot(cluster_size, n_cluster, 'rx-')
ax1.set_xlabel('Minimal cluster size')
ax1.yaxis.label.set_color('blue')
ax1.set_ylabel('Silhouette Coefficient')
ax2.yaxis.label.set_color('red')
ax2.set_ylabel('Number of clusters')
plt.title('Ellbow-method-like plot')
plt.show()
sys.exit()
# %% perform hdbscan-clustering
if not 'umap_embeddings' in locals():
umap_embeddings = np.load(embedding_backup_folder + "umap_embeddingsBackup.npy")
cluster = hdbscan.HDBSCAN(min_cluster_size=args.k_value,
metric='euclidean',
cluster_selection_method='eom').fit(umap_embeddings)
# and print the results
result = pd.DataFrame(umap_text_features2D, columns=['x', 'y'])
result['labels'] = cluster.labels_.tolist() # cluster.labels_
print("cluster indices: " + str(np.unique(result.labels)))
outliers = result.loc[result.labels == -1, :]
clustered = result.loc[result.labels != -1, :]
print(str(len(outliers.x)) + " outliers")
# save umaped vectors:
df = pd.read_pickle(args.df_cases_file)
df['umapX_' + args.model2use] = result.x
df['umapY_' + args.model2use] = result.y
# %% update df_cases
df['label_' + args.model2use] = result.labels
df.to_pickle(args.df_cases_file)
# %% and evaluate the results with several metrics (not needing ground truth)
evaluation = ClusterMetrics(umap_embeddings[result.labels >= 0,], clustered.labels.tolist(),
file_name="TextClustering/cluster_metrics/" + args.model2use + "_metrics.pkl")
evaluation.write_to_file()
This diff is collapsed.
import sys, os
from tqdm import tqdm
import pandas as pd
import pickle
from database_preparation.preprocess import print_meta_data
from database_preparation.utils_labeled_datasets import is_text_lst_tokenized
sys.path.append(os.getcwd())
# parse arguments:
from TextClustering.argsparse_clustering_preamble import argsparse_preamble
args = argsparse_preamble()
print("arguments:")
print(args)
if is_text_lst_tokenized(args.path2corpus):
print("Error: "+args.path2corpus + '.pkl is tokenized! '
'Please pass texts list where each text is a single string!')
exit(1)
#%% load the data
with open(args.path2corpus, 'rb') as f:
diag_lst = pickle.load(f)
print_meta_data(args.path2corpus)
#%% perform clustering repetitive
if args.find_k_value:
from TextClustering.utils_metrics import ClusterMetrics
from TextClustering.basedOn_Embedding.top2vec import Top2Vec
import matplotlib.pyplot as plt # load our modified version (for visualization)
s_score, n_cluster = [], []
cluster_size = range(3, 25, 2)
for i_cluster_size in tqdm(cluster_size):
#%% perform text-clustering (like in the paper)
hdbscan_args = {'min_cluster_size': i_cluster_size,
'metric': 'euclidean',
'cluster_selection_method': 'eom'}
model = Top2Vec(diag_lst,
embedding_model=args.model2use,
min_count=0,
hdbscan_args=hdbscan_args)
#%% get the clusters
n_cluster.append(model.get_num_topics())
evaluation = ClusterMetrics(model.umap_model.embedding_[model.result.labels >= 0,],
model.clustered.labels.tolist())
s_score.append(evaluation.s_score)
#%% plot the results
fig, ax1 = plt.subplots()
ax2 = ax1.twinx()
ax1.plot(cluster_size, s_score, 'bx-')
ax2.plot(cluster_size, n_cluster, 'rx-')
ax1.set_xlabel('Minimal cluster size')
ax1.yaxis.label.set_color('blue')
ax1.set_ylabel('Silhouette Coefficient')
ax2.yaxis.label.set_color('red')
ax2.set_ylabel('Number of clusters')
plt.title('Ellbow-method-like plot')
plt.show()
exit()
#%% perform text-clustering (like in the paper)
from TextClustering.basedOn_Embedding.top2vec import Top2Vec # load our modified version (for visualization)
hdbscan_args = {'min_cluster_size': args.k_value,
'metric': 'euclidean',
'cluster_selection_method': 'eom'}
model = Top2Vec(diag_lst,
embedding_model = args.model2use,
min_count = 0,
hdbscan_args=hdbscan_args)
#%% get the words and topics
# by their way
model.get_num_topics()
topic_sizes, topic_nums = model.get_topic_sizes()
# print some infos:
outliers = model.umap_model.embedding_[model.result.labels == -1,]
print(f"found {len(topic_nums)} topics")
print(f"found {len(outliers)} outilers.")
topic_words, word_scores, topic_nums = model.get_topics(len(topic_nums))
pd.DataFrame(topic_words).to_excel(
'TextClustering/tables/WordsPerCluster_Top2Vec_' + args.model2use + '.xlsx',
sheet_name= "in-function")
# save umaped vectors and labels:
df = pd.read_pickle(args.df_cases_file)
df['umapX_top2vec'] = model.result.x
df['umapY_top2vec'] = model.result.y
df['label_top2vec'] = model.result.labels
df.to_pickle(args.df_cases_file)
#%% calculate clustering-metrics
from TextClustering.utils_metrics import ClusterMetrics
evaluation = ClusterMetrics(model.umap_model.embedding_[model.result.labels >= 0,], model.clustered.labels.tolist(),
file_name= "TextClustering/cluster_metrics/top2vec_metrics.pkl")
evaluation.write_to_file()
\ No newline at end of file
# -*- coding: iso-8859-1 -*-
import pickle
from os import listdir
from os.path import isfile, join
import numpy as np
import pandas as pd
import database_preparation.utils_labeled_datasets as dt
from TextClassification.classification_for_cluster_evaluation import cross_validate_with_simple_SVM
from CorpusHomogeneity.cluster_entropy import cluster_entropy
from CorpusHomogeneity.text_entropy import corpus_entropy
recalc_cls_accuracy = True
use_always_bow_data_for_svm_accuracy = True
sort_table_by = ['s-score'] # s-score or cls accuracy
table_save_path = 'TextClustering/tables/cluster_metrics_overview'
path2corpus_bow_preprocessed = 'database/bow_prepro_diag.pkl'
path2corpus_embedding_preprocessed = 'database/embedding_prepro_diag.pkl'
scorepath = "TextClustering/cluster_metrics/"
df_cases_file = './database/df_cases.pkl'
def main():
# ########## print cluster scores as latex table: ##################
methodnames = ['KMeans', 'LDA', 'HDBSCAN', 'German_BERT', 'Patho_BERT', 'top2vec', 'GSDPMM']
skipped_methods = []
print(dt.get_all_label_set_ids())
s_scores = []
entropy_scores = []
cls_ac_scores = []
cluster_nums = []
report_nums = []
round_to = 3
for label_set in methodnames:
try:
scores = pd.read_pickle(scorepath + label_set + "_metrics.pkl")[label_set+'_metrics']
except:
print(f"skipping {label_set}.")
skipped_methods.append(label_set)
continue
if label_set in ['German_BERT', 'Patho_BERT', 'top2vec']:
text_corpus_path = path2corpus_embedding_preprocessed
else:
text_corpus_path = path2corpus_bow_preprocessed
try:
s_scores.append(round(scores['s-score'],3))
except:
s_scores.append(None)
try:
cluster_nums.append(str(dt.get_amount_unique_labels(label_set)))
except:
cluster_nums.append(None)
try:
report_nums.append(str(dt.get_amount_reports(label_set)))
except:
report_nums.append(None)
### cls accuracy with svm ###
if recalc_cls_accuracy:
if use_always_bow_data_for_svm_accuracy:
metrics = cross_validate_with_simple_SVM(label_set,
path2corpus_bow_preprocessed,
df_cases_file)
else:
metrics = cross_validate_with_simple_SVM(label_set,
text_corpus_path,
df_cases_file)
print("================ f1-per cluster for cluster-set: " + label_set + " ================")
df = metrics.classes_scores(-1)
print(df.to_latex().replace('{}', 'cluster'))
cls_ac_scores.append(round(np.mean(metrics.scores['accuracy']), round_to))
else:
try:
cls_ac_scores.append(round(scores['svm-accuracy'], round_to))
except:
report_nums.append(None)
### calculate entropy ###
with open(text_corpus_path, 'rb') as f:
text = pickle.load(f)
df = pd.read_pickle(df_cases_file)
clusters = df['label_'+label_set].tolist()
frame = pd.DataFrame({'text': text, 'cluster': clusters}, index=[clusters])
ent = cluster_entropy(frame)
ent_mean, ent_std = corpus_entropy(text)
entropy_scores.append(round(ent[0] / ent_mean, round_to))
for methodname in skipped_methods:
methodnames.remove(methodname)
methodnames = [n.replace("_metrics", "") for n in methodnames]
df = pd.DataFrame({'cluster method': methodnames, 's-score': s_scores,
'cls accuracy': cls_ac_scores, 'rel entropy': entropy_scores,
'clusters': cluster_nums, 'corpus size': report_nums})
df.sort_values(by=sort_table_by, inplace=True, ascending=False)
latex_table = df.to_latex(index=False)
print("%================== clustering metric scores =================")
print(latex_table)
print("%===================================\n\n")
with open(table_save_path+'_latex.txt', 'w') as f:
f.write(latex_table)
df.to_excel(table_save_path+'.xlsx')
if __name__ == '__main__':
main()
\ No newline at end of file
import os, sys
# params:
path2corpus_bow_preprocessed_diagnosis = 'database/bow_prepro_diag.pkl'
path2corpus_embedding_preprocessed_diagnosis = 'database/embedding_prepro_diag.pkl'
# check if we are at correct working directory:
workdir = os.getcwd()
if not workdir[-len('nlp-in-diagnostic-texts-from-nephropathology'):] == 'nlp-in-diagnostic-texts-from-nephropathology':
print(workdir + " is the wrong working directory.")
print("please make shure to run this script with working directory '.../path/to/nlp-in-diagnostic-texts-from-nephropathology'.")
exit(1)
# add some folders for metrics, plot, tables etc:
if not os.path.isdir('TextClustering/cluster_metrics'):
os.makedirs('TextClustering/cluster_metrics')
if not os.path.isdir('TextClustering/tables'):
os.makedirs('TextClustering/tables')
if not os.path.isdir('TextClustering/plots'):
os.makedirs('TextClustering/plots')
if not os.path.isdir('TextClustering/plots/histograms'):
os.makedirs('TextClustering/plots/histograms')
if not os.path.isdir('TextClustering/plots/UMAP'):
os.makedirs('TextClustering/plots/UMAP')
if not os.path.isdir('TextClustering/plots/PCA'):
os.makedirs('TextClustering/plots/PCA')
# Construct clustering pipeline. This is a suggestion how to use all the scripts.
# I also recommend to run each clustering script one by on to fintune the clusterings (with argument --find_k_value)
script_queue = [
f"python TextClustering/basedOn_BOW/kmeans_Diagnosis.py --path2corpus {path2corpus_bow_preprocessed_diagnosis} --k_value {10}",
f"python TextClustering/basedOn_BOW/LDA_Diagnosis.py --path2corpus {path2corpus_bow_preprocessed_diagnosis} --k_value {12}",
f"python TextClustering/basedOn_BOW/HDBSCAN_Diagnosis.py --path2corpus {path2corpus_bow_preprocessed_diagnosis} --k_value {10}",
f"python TextClustering/basedOn_BOW/GSDPMM_Diagnosis.py --path2corpus {path2corpus_bow_preprocessed_diagnosis} --k_value {14}",
f"python TextClustering/basedOn_Embedding/BERT_Diagnosis.py --path2corpus {path2corpus_embedding_preprocessed_diagnosis} --do_embedding --model2use German_BERT --k_value {17}",
f"python TextClustering/basedOn_Embedding/BERT_Diagnosis.py --path2corpus {path2corpus_embedding_preprocessed_diagnosis} --do_embedding --model2use Patho_BERT --k_value {8}",
f"python TextClustering/basedOn_Embedding/top2vec_Diagnosis.py --path2corpus {path2corpus_embedding_preprocessed_diagnosis} --model2use doc2vec --k_value {7}",
"python TextClustering/cluster_scores2latextable.py",
"python TextClustering/plot_clustersets.py",
"python TextClustering/generate_topicwords.py",
"python TextClustering/clusterset_histos.py"
]
for script in script_queue:
print("\n########################################### executing ###########################################")
print(script)
print("####################################################################################################\n")
os.system(script)
import database_preparation.utils_labeled_datasets as dt
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import sys, os
import argparse
sys.path.append(os.getcwd())
parser = argparse.ArgumentParser()
parser.add_argument("--df_cases_file", default="database/df_cases.pkl")
args = parser.parse_args()
plot_author_histos = False
cluster = 2
clustersets = ["HDBSCAN", "KMeans", "LDA", "GSDPMM",
"top2vec", "Patho_BERT", "German_BERT"]
df = pd.read_pickle(args.df_cases_file)
authors_labels = df["label_author"]
# plot histograms: how much docs do have the same label=cluster-index?
for i,label_set in enumerate(clustersets):
try:
cluster_labels = dt.label_list_as_int_list(df['label_' + label_set])
except:
print(f"skipping {label_set}. it is not in the df_cases_file.")
continue
if plot_author_histos:
authors_of_cluster = [authors_labels[i] for i, label in enumerate(cluster_labels) if
label == cluster]
authors = np.asarray(authors_of_cluster)
x = [-1,0,1,2,3]
h = []
for l in x:
h.append(sum([1 for a in authors if a == l]))
plt.bar(x, height=h)
plt.title(label_set + " authors in cluster " + str(cluster))
file_path = 'TextClustering/plots/histograms/histogram_' + label_set + "_cluster" + str(cluster) + "_authors.png"
else:
labels = np.asarray([l for l in cluster_labels if l != -1])
label_num = dt.get_amount_unique_labels(label_set)
x = np.arange(label_num)
h = []
for l in x:
h.append(sum([1 for label in labels if label == l]))
plt.bar(x, height=h)
plt.xticks(x, x)
plt.title(label_set)
plt.title(label_set)
file_path = 'TextClustering/plots/histograms/histogram_' + label_set + ".png"
plt.xticks(x, x)
plt.savefig(file_path, dpi=600)
plt.close()
plt.clf()
print(f"generated {file_path}")
from TextClustering.utils_wordlist import generate_save_topicwords
import pandas as pd
from database_preparation.utils_labeled_datasets import label_list_as_int_list
from database_preparation.preprocess import get_metadata
import pickle
import openpyxl
# parameters:
df_cases_file = "database/df_cases.pkl"
print_latex = False
filter_stop_words = True
path2umap_pics = 'TextClustering/plots/UMAP/'
save_umap_picture_in_table = True
path2corpus_bow_preprocessed = 'database/bow_prepro_diag.pkl'
path2corpus_embedding_preprocessed = 'database/embedding_prepro_diag.pkl'
####### functions ##########
def main():
cluster_sets = ['KMeans', 'LDA', 'HDBSCAN', 'German_BERT', 'Patho_BERT', 'top2vec', 'GSDPMM']
# cluster_sets = ['German_BERT']
df_cases = pd.read_pickle(df_cases_file)
for cluster_set in cluster_sets:
# re-generate the topic words:
excel_file_path = 'TextClustering/tables/WordsPerCluster_' + cluster_set + '.xlsx'
# convert nan-values in int(-1):
try:
clusters = label_list_as_int_list(df_cases['label_' + cluster_set])
except:
print(f"skipping {cluster_set}. it is not in the df_cases_file.")
continue
if cluster_set in ['German_BERT', 'Patho_BERT', 'top2vec']:
text_corpus_path = path2corpus_embedding_preprocessed
else:
text_corpus_path = path2corpus_bow_preprocessed
meta_params = get_metadata(text_corpus_path)
with open(text_corpus_path, 'rb') as f:
diag_lst = pickle.load(f)
# do not apply stopwordfilterg if it is already stopword filtered!
generate_save_topicwords(clusters, diag_lst, save_excel_file_path=excel_file_path,
n_words=10, print_latex_table=print_latex,
filter_stop_words=filter_stop_words and not meta_params['stopword_filtered'])
if save_umap_picture_in_table:
pic_path = path2umap_pics + cluster_set + "_UMAP.png"
try:
img = openpyxl.drawing.image.Image(pic_path)
wb = openpyxl.load_workbook(excel_file_path)
ws = wb.create_sheet("umap")
img.anchor = 'A1'
img.width = img.width / 2
img.height = img.height / 2
ws.add_image(img)
wb.save(excel_file_path)
print(f"Generated {excel_file_path}")
except:
print("could not load " + pic_path)
print("therefore, cant place umap picture into " + excel_file_path)
######### topic words of authors #########
clusters = label_list_as_int_list(df_cases['label_author'])
excel_file_path = 'TextClustering/tables/WordsPerCluster_authors.xlsx'
with open(path2corpus_bow_preprocessed, 'rb') as f:
diag_lst = pickle.load(f)
generate_save_topicwords(clusters, diag_lst, save_excel_file_path=excel_file_path,
n_words=20, print_latex_table=print_latex,
filter_stop_words=False)
if __name__ == '__main__':
main()
import pandas as pd
from TextClustering.utils_metrics import cluster_scatter_plot
import numpy as np
from database_preparation.utils_labeled_datasets import label_list_as_int_list
clustersets = ["GSDPMM", "KMeans", "LDA", "HDBSCAN",
"top2vec", "Patho_BERT", "German_BERT"]
plot_titles = ["GSDPMM (UMAP representation)", "k-means (UMAP representation)",
"LDA (UMAP representation)", "HDBSCAN (UMAP representation)",
"top2vec (UMAP representation)", "Patho-BERT (UMAP representation)",
"German-BERT (UMAP representation)"]
df_cases_file = "database/df_cases.pkl"
def save_umap_plot(clustersetname, df, title=None):
if not 'label_' + clustersetname in df:
print("skipping " + clustersetname + ", it is not in df_cases_file:")
print(df)
return
predictedCluster_text_features = label_list_as_int_list(df['label_' + clustersetname])
try:
umap_text_features2D = np.asarray([[e for e in df['umapX_' + clustersetname]],
[e for e in df['umapY_' + clustersetname]]])
except:
print("there is no umapX_" + clustersetname + " in database/df_cases.pkl. => skipping")
return
umap_text_features2D = np.transpose(umap_text_features2D)
cluster_scatter_plot(umap_text_features2D, predictedCluster_text_features,
"TextClustering/plots/UMAP/" + clustersetname + "_UMAP.png",
show_plot=False, colorblindfriendly=False, fig_title=title)
if 'label_author' in df:
author_labels = df["label_author"]
cluster_scatter_plot(umap_text_features2D, author_labels,
"TextClustering/plots/UMAP/" + clustersetname + "_UMAP_authors.png",
show_plot=False, colorblindfriendly=True, number_data_points=False
, fig_title=title + ", colored by authors")
if 'label_golden' in df:
golden_labels = df["label_golden"]
cluster_scatter_plot(umap_text_features2D, golden_labels,
"TextClustering/plots/UMAP/" + clustersetname + "_UMAP_goldenlabel.png",
show_plot=False, colorblindfriendly=True
, fig_title=title + " colored with golden labels")
def main():
df = pd.read_pickle(df_cases_file)
for clustersetname in clustersets:
if clustersetname in clustersets:
title = plot_titles[clustersets.index(clustersetname)]
else:
title = None
save_umap_plot(clustersetname, df, title)
# plot author-colored and cluster-colored lda clustersets as pca representation:
if 'label_LDA' in df and 'pcaX_LDA' in df:
predictedCluster_text_features = df['label_LDA']
features2D = np.asarray([[e for e in df['pcaX_LDA']],
[e for e in df['pcaY_LDA']]])
features2D = np.transpose(features2D)
cluster_scatter_plot(features2D, predictedCluster_text_features,
"TextClustering/plots/PCA/LDA_PCA.png",
show_plot=False, colorblindfriendly=False,
fig_title="LDA (PCA representation)")
cluster_scatter_plot(features2D, df["label_author"],
"TextClustering/plots/PCA/LDA_PCA_authors.png",
show_plot=False, colorblindfriendly=True,
number_data_points=False, fig_title='LDA (PCA representation), colored by authors')
if __name__ == '__main__':
main()
import openpyxl
from TextClustering.utils_wordlist import get_top_cluster_words_as_latex_table
from googletrans import Translator # use pip install googletrans==3.1.0a0, 3.0 version is broken
from utils_general import custom_translation
path2table = "WordsPerCluster_HDBSCAN.xlsx"
green = 'FF00FF00'
blue = 'FF4A86E8'
orange = 'FFFF9900'
black = '1'
latex_weak_word = '\\weakcolor'
latex_strong_word = '\\strongcolor'
def color2latex_color(color):
if color == green:
return latex_strong_word
if color == blue:
return latex_weak_word
if color == orange:
return latex_weak_word
# print(f"unknown color: {color}")
return None
def get_annotated_exceltable(ws):
words_list = []
topics = []
colors = []
for idx, col in enumerate(ws.iter_rows(min_row=2, max_row=25, min_col=1, max_col=11)):
if col[0].value is None:
break
words_list.append([])
colors.append([])
for i, cell in enumerate(col):
if i == 0:
topics.append((cell.value, color2latex_color(cell.font.color.rgb)))
else:
words_list[idx].append(cell.value)
colors[idx].append(color2latex_color(cell.font.color.rgb))
# return get_top_cluster_words_as_latex_table(words_list, colors, topics)
return words_list, colors, topics
def main():
wb = openpyxl.load_workbook(path2table)
extraction_methods = ['tf-idf', 'SVM']
cluster_method = 'HDBSCAN'
anotate_svm_as_tfidf = True
print_also_translated_tables = True
translator = Translator()
words_list_tfidf = []
colorstfidf = []
topicstfidf = []
for i, extraction_method in enumerate(extraction_methods):
ws = wb[['TFIDF-based', 'svm-based'][i]]
words_list, colors, topics = get_annotated_exceltable(ws)
if anotate_svm_as_tfidf:
if extraction_method != 'tf-idf':
topics = topicstfidf
for j, words in enumerate(words_list):
for k, word in enumerate(words):
if word in words_list_tfidf[j]:
colors[j][k] = colorstfidf[j][words_list_tfidf[j].index(word)]
else:
words_list_tfidf, colorstfidf, topicstfidf = words_list, colors, topics
# print german topic words:
label = 'table_cluster_topics_' + cluster_method + '_' + extraction_method + '_ger'
# print("\n processing " + label+"...\n")
description = f'Annotated German topic words, extracted from the {cluster_method} cluster-set, ' \
f'using the {extraction_method} based extraction method.'
latex = get_top_cluster_words_as_latex_table(words_list, colors, topics).replace('DESCRIPTON',
description).replace(
'EXTRACTIONMETHOD', extraction_method).replace(
'LABEL', label
)
print(latex)
# print english topic words:
if print_also_translated_tables:
description = f'Annotated topic words (translated from German to English), ' \
f'extracted from the {cluster_method} cluster-set, ' \
f'using the {extraction_method} based extraction method.'
label = 'table_cluster_topics_' + cluster_method + '_' + extraction_method + '_eng'
word_list_eng = [[word if word.lower() not in custom_translation.keys() else custom_translation[word.lower()]
for word in words] for words in words_list]
topics_eng = [(translator.translate(topic[0], src='de').text, topic[1]) for topic in topics]
latex = get_top_cluster_words_as_latex_table(word_list_eng, colors, topics_eng).replace('DESCRIPTON',
description).replace(
'EXTRACTIONMETHOD', extraction_method).replace(
'LABEL', label
)
print(latex)
if __name__ == '__main__':
main()
#%% import
from sklearn.metrics import silhouette_score, calinski_harabasz_score, davies_bouldin_score
from validclust import cop, dunn
from sklearn.metrics import pairwise_distances
import pandas as pd
import os
#%% class definition
class ClusterMetrics:
def __init__(self, feature_matrix, feature_label, file_name = "cluster_metrics.pkl"):
self.file_name = file_name
self.feature_matrix = feature_matrix
if (type(feature_matrix).__name__) == "csr_matrix":
self.__feature_matrix_array__ = feature_matrix.toarray()
elif (type(feature_matrix).__name__) == "list":
self.__feature_matrix_array__ = np.array(feature_matrix)
else:
self.__feature_matrix_array__ = feature_matrix
self.feature_label = feature_label
self.__feature_label_array = np.array(feature_label)
self.__distance_matrix__ = pairwise_distances(self.__feature_matrix_array__)
# calculate Silhouette Coefficien (values -1 to 1)
self.s_score = silhouette_score(feature_matrix, feature_label)
# caculate Calinski-Harabasz Index (the higher the value, the better)
self.ch_index = calinski_harabasz_score(self.__feature_matrix_array__ , feature_label)
# calcualte the Davies-Bouldin Index (the highter, the better)
self.db_score = davies_bouldin_score(self.__feature_matrix_array__, feature_label)
# calculate COP CVI
self.cop = cop(self.__feature_matrix_array__, self.__distance_matrix__, self.__feature_label_array)
# calculate Dunn CVI
self.dunn = dunn(self.__distance_matrix__, self.__feature_label_array)
# place for entropy
self.entropy = None
self.svm_accuracy = None
def write_to_file(self):
results = [np.round(self.s_score,3),
np.round(self.ch_index,3),
np.round(self.db_score,3),
np.round(self.cop,3),
np.round(self.dunn,3),
self.entropy,
self.svm_accuracy]
head, tail = os.path.split(self.file_name)
tail = tail[:-4]
df = pd.DataFrame(results,
index =['s-score', 'ch-index', 'db-score', 'cop', 'dunn-score', 'entropy', 'svm-accuracy'],
columns =[tail])
df.to_pickle(self.file_name)
print(df)
def __str__(self):
return "s-score: " + str(np.round(self.s_score,2)) + "[-1:1]" + "\n" + \
"ch-index: " + str(np.round(self.ch_index,2)) + " [0:]" "\n" + \
"db-score: " + str(np.round(self.db_score, 2)) + "[0:]" "\n" + \
"cop: " + str(np.round(self.cop, 2)) + " []" "\n" + \
"dunn: " + str(np.round(self.dunn, 2)) + " []" "\n" + \
"entropy: " + str(self.entropy) + " []" "\n" + \
"svm-accuracy: " + str(self.svm_accuracy) + " []"
#%% define plot functions for PCA
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt
def plot_pca(text_features, labels, file_path = [], show_plot = True):
pca = PCA(n_components=2)
reduced_features = pca.fit_transform(text_features)
plt.close()
plt.scatter(x=reduced_features[:, 0], y=reduced_features[:, 1],
c=np.int8(labels), cmap="tab20")
plt.colorbar()
plt.title('PCA-representation')
if bool(file_path):
plt.savefig(file_path)
if show_plot:
plt.show()
#%% define plot function for T-SNE
from sklearn.manifold import TSNE
def plot_tsne(text_features, labels, file_path = [], show_plot = True):
tsne = TSNE(n_components=2, verbose=1, random_state=123)
reduced_features = tsne.fit_transform(text_features)
plt.close()
plt.scatter(x=reduced_features[:, 0], y=reduced_features[:, 1],
c=np.int8(labels), cmap="tab20")
plt.colorbar()
plt.title('T-SNE-representation')
if bool(file_path):
plt.savefig(file_path)
if show_plot:
plt.show()
#%% define plot function for UMAP
import umap
import seaborn as sns
def cluster_scatter_plot(umap_text_features2D, labels, file_path = [],
show_plot=True, colorblindfriendly=True,
number_data_points=True, fig_title=None):
'''umap_text_features2D = umap.UMAP(n_neighbors=15,
n_components=2,
min_dist=0.0, metric='cosine').fit_transform(text_features)'''
plt.close()
# plot unvalid labeled datapoints in black:
x = [e for i, e in enumerate(umap_text_features2D[:, 0]) if labels[i] == -1]
y = [e for i, e in enumerate(umap_text_features2D[:, 1]) if labels[i] == -1]
if len(x) > 0:
plt.scatter(x=x, y=y, c='black', marker='.')
# plot valid labeled datapoints:
x_val = [e for i,e in enumerate(umap_text_features2D[:, 0]) if labels[i] != -1]
y_val = [e for i,e in enumerate(umap_text_features2D[:, 1]) if labels[i] != -1]
valid_labels = [l for l in labels if l != -1]
if colorblindfriendly:# use colorblind palette, it has 10 colors
style = []
for label in valid_labels:
if label <= 9: #0-9
style.append(0)
elif label >= 20: #20-inf
style.append(1)
else: #10-19
style.append(2)
sns.scatterplot(x=x_val, y=y_val, hue=valid_labels,
palette="colorblind", style=style,
legend=True, linewidth=.3)
if number_data_points:
nummerate_clusters_in_plot(x_val, y_val, valid_labels)
else: # use tap20, it has 20 different colors
x = [e for i, e in enumerate(x_val) if valid_labels[i] <= 19]
y = [e for i, e in enumerate(y_val) if valid_labels[i] <= 19]
c = [e for e in valid_labels if e <= 19]
plt.scatter(x=x,
y=y,
c=np.int8(c),
cmap="tab20", edgecolors='white', linewidth=.3
, marker='o')
if number_data_points:
nummerate_clusters_in_plot(x,y,c)
'''plt.legend(handles=scatter.legend_elements()[0],
labels=[str(l) for l in c], loc="best")'''
plt.colorbar(values=[int(e) for e in np.unique(np.asarray(c))])
c = [e for e in valid_labels if e > 19]
if len(c)>0:
x = [e for i, e in enumerate(x_val) if valid_labels[i] > 19]
y = [e for i, e in enumerate(y_val) if valid_labels[i] > 19]
plt.scatter(x=x, y=y,
c=np.int8(c),
cmap="tab20", edgecolors='white', linewidth=.3
, marker='P')
if number_data_points:
nummerate_clusters_in_plot(x, y, c)
if fig_title is None:
if bool(file_path):
import os
fig_title = os.path.basename(file_path)
else:
fig_title = "UMAP"
plt.title(fig_title.replace(".png",""))
if bool(file_path):
print("generated "+file_path)
plt.savefig(file_path,dpi=300)
if show_plot:
plt.show()
def nummerate_clusters_in_plot(x,y,labels):
annotated_labels = []
for i, label in enumerate(labels):
if label not in annotated_labels:
plt.annotate(label, (x[i], y[i]))
annotated_labels.append(label)
def Jaccard_Similarity(doc1, doc2):
if isinstance(doc1, list):
doc1 = " ".join(doc1)
doc2 = " ".join(doc2)
# List the unique words in a document
words_doc1 = set(doc1.lower().split())
words_doc2 = set(doc2.lower().split())
#print(words_doc1)
#print(words_doc2)
# Find the intersection of words list of doc1 & doc2
intersection = words_doc1.intersection(words_doc2)
# Find the union of words list of doc1 & doc2
union = words_doc1.union(words_doc2)
# Calculate Jaccard similarity score
# using length of intersection set divided by length of union set
return float(len(intersection)) / len(union)
#%%
import numpy as np
def get_distance_matrix(str_list):
dist_matrix = np.zeros(shape=(len(str_list), len(str_list)))
# calculate the lower triangle
for i in range(0, len(str_list)):
for j in range(i+1, len(str_list)):
dist_matrix[i][j] = Jaccard_Similarity(str_list[i], str_list[j])
# fill the upper triangle
for i in range(0, len(str_list)):
for j in range(0, len(str_list)):
if i == j:
dist_matrix[i][j] = 0
elif i > j:
dist_matrix[i][j] = dist_matrix[j][i]
return dist_matrix
This diff is collapsed.
{"source_data": "../DataNephroTexts/description", "tokenized": true, "cased": false, "stopword_filtered": true, "use_combiner": true, "use_replacer": true, "lemma_mode": 3, "punct_mode": 2, "number_mode": 3}
\ No newline at end of file
{"source_data": "../DataNephroTexts/diagnosis", "tokenized": true, "cased": false, "stopword_filtered": true, "use_combiner": true, "use_replacer": true, "lemma_mode": 3, "punct_mode": 2, "number_mode": 3}
\ No newline at end of file
{"source_data": "../DataNephroTexts/description", "tokenized": false, "cased": true, "stopword_filtered": false, "use_combiner": true, "use_replacer": true, "lemma_mode": 4, "punct_mode": 1, "number_mode": 3}
\ No newline at end of file
{"source_data": "../DataNephroTexts/diagnosis", "tokenized": false, "cased": true, "stopword_filtered": false, "use_combiner": true, "use_replacer": true, "lemma_mode": 4, "punct_mode": 1, "number_mode": 3}
\ No newline at end of file
from transformers import AutoModelForMaskedLM, AutoTokenizer
import pickle
# script parameters:
modelname = "bert-base-german-cased"
path2corpus_embedding_preprocessed_diagnosis = 'database/embedding_prepro_diag.pkl'
path2corpus_embedding_preprocessed_description = 'database/embedding_prepro_desc.pkl'
tokenizer = AutoTokenizer.from_pretrained(modelname)
model = AutoModelForMaskedLM.from_pretrained(modelname)
unknown_id = tokenizer.convert_tokens_to_ids(tokenizer.unk_token)
with open(path2corpus_embedding_preprocessed_description, 'rb') as f:
micro_texts = pickle.load(f)
with open(path2corpus_embedding_preprocessed_diagnosis, 'rb') as f:
diag_texts = pickle.load(f)
def find_oov_cases(texts):
oov_cases = 0
for text_num, text in enumerate(texts):
if unknown_id in tokenizer.encode(text):
tokens = text.split(" ")
for i, token in enumerate(tokens):
if unknown_id in tokenizer.encode(token):
oov_cases += 1
print("found OOV case in text " + str(text_num))
print("the word \'" + str(token) + "\' in " + str(tokens[i - 2:i + 2]) + " is OOV")
return oov_cases
oov_sum = find_oov_cases(micro_texts) + find_oov_cases(diag_texts)
print("\nFinished. Found " + str(oov_sum) + " OOV cases (see above).")
\ No newline at end of file
# -*- coding: iso-8859-1 -*-
import os
# params:
path_to_reports = '../DataNephroTexts/reports'
author_names = "Name1 Name2 Name3 Name4" ## <- Type in the names of the pathologists of your institut!
splitted_reports_folder_path = '../DataNephroTexts'
path2corpus_bow_preprocessed_diagnosis = 'database/bow_prepro_diag.pkl'
path2corpus_embedding_preprocessed_diagnosis = 'database/embedding_prepro_diag.pkl'
path2corpus_bow_preprocessed_description = 'database/bow_prepro_desc.pkl'
path2corpus_embedding_preprocessed_description = 'database/embedding_prepro_desc.pkl'
# check if we are at correct working directory:
workdir = os.getcwd()
if not workdir[-len('nlp-in-diagnostic-texts-from-nephropathology'):] == 'nlp-in-diagnostic-texts-from-nephropathology':
print(workdir + " is the wrong working directory.")
print("please make shure to run this script with working directory '.../path/to/nlp-in-diagnostic-texts-from-nephropathology'.")
exit(1)
preperateion_queue = [
"python database_preparation/split_reports.py --path_to_reports " + path_to_reports + " --target_folder_path " + splitted_reports_folder_path + " --author_names \"" + author_names + '\"',
f"python database_preparation/preprocess.py --path_to_preprocessing_params {path2corpus_bow_preprocessed_diagnosis.replace('.pkl','_meta.json')} --target_path {path2corpus_bow_preprocessed_diagnosis}",
f"python database_preparation/preprocess.py --path_to_preprocessing_params {path2corpus_embedding_preprocessed_diagnosis.replace('.pkl','_meta.json')} --target_path {path2corpus_embedding_preprocessed_diagnosis}",
f"python database_preparation/preprocess.py --path_to_preprocessing_params {path2corpus_bow_preprocessed_description.replace('.pkl','_meta.json')} --target_path {path2corpus_bow_preprocessed_description}",
f"python database_preparation/preprocess.py --path_to_preprocessing_params {path2corpus_embedding_preprocessed_description.replace('.pkl','_meta.json')} --target_path {path2corpus_embedding_preprocessed_description}",
]
for script in preperateion_queue:
print("\n########################################### executing ###########################################")
print(script)
print("####################################################################################################\n")
os.system(script)
# -*- coding: iso-8859-1 -*-
import sys, os
import pandas as pd
from database_preparation.utils_stringpreparation import read_german_text
import argparse
def amount_names(text):
return len(text.split('Dr.'))-1
def cut_off_by_keywords(text, keywords=['Nachtragsbefund','Nachbericht']):
'''
cuts of (removes) the text-part which begins with any of the passed keyword(s)
and returns the new (shortened) text.
'''
for keyword in keywords:
if keyword in text:
text = text[:text.index(keyword)]
return text
def get_names(text):
names = []
for parts in text.split('Dr. med.')[1:]:
tokens = parts.split(' ')
for token in tokens:
if '.' in token:
continue
if True in [c.isdigit() for c in token]:
continue
if 'Tel' in token:
continue
if token in '- war Befundverwendung f�r wissenschaftliche Zwecke oder Gutachten nur mit Genehmigung des Befunders OA PD':
continue
names.append(token)
return names
def add_author_labels_to_df_cases(path_to_end_sections, authors, df_cases_file = "database/df_cases.pkl"):
df = pd.read_pickle(df_cases_file)
filenames = df["end_text_files"]
author_labels = []
print(f"\nLabeling df_cases file with authors. Searching for {authors} in {path_to_end_sections}")
for idx, filename in enumerate(filenames):
text = cut_off_by_keywords(read_german_text(path_to_end_sections + '/' + filename))
# detect authors in text
authors_in_text = [0 for a in range(len(authors))]
for j, author in enumerate(authors):
if author in text:
authors_in_text[j] = 1
# if only one author detected:
autor_combination_as_decimal = sum([pow(2, i) * n for i, n in enumerate(authors_in_text)])
if sum(authors_in_text) == 1:
label = authors_in_text.index(1)
else:
label = -1
author_labels.append(label)
df['label_author'] = author_labels
df.to_pickle(df_cases_file)
print("=> finished. Results:")
for i, author in enumerate(authors):
num = 0
for label in author_labels:
if label == i:
num += 1
print(author + " accured " + str(num) + " times")
sum_no_author = 0
for label in author_labels:
if label == -1:
sum_no_author = sum_no_author + 1
print(str(sum_no_author) + " unknown authors.")
return True
def main():
# parse arguments:
sys.path.append(os.getcwd())
parser = argparse.ArgumentParser()
parser.add_argument("--path_to_end_sections",
default='../DataNephroTexts/end')
parser.add_argument("--author_names",
default="Name1 Name2")
args = parser.parse_args()
authors = args.author_names.split(' ')
add_author_labels_to_df_cases(args.path_to_end_sections, authors)
if __name__ == '__main__':
main()
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
import pandas as pd
import pickle
import numpy as np
import nltk
from sklearn.feature_extraction.text import TfidfVectorizer
from database_preparation.preprocess import print_meta_data, prepro_params_2_string
# parameters:
df_cases_file = "database/df_cases.pkl"
text_corpus_paths = ['database/embedding_prepro_diag.pkl',
'database/bow_prepro_diag.pkl',
'database/embedding_prepro_desc.pkl',
'database/bow_prepro_desc.pkl']
vector_corpus_paths = ['database/diagnosis_texts_vectorized_DR_preprocessed.pkl',
'database/diagnosis_texts_vectorized_bow_preprocessed.pkl',
'database/description_texts_vectorized_DR_preprocessed.pkl',
'database/description_texts_vectorized_bow_preprocessed.pkl']
####### functions ##########
def identity(words):
return words
def get_trained_tfidf(texts):
vec = TfidfVectorizer(tokenizer=identity, preprocessor=None, lowercase=False)
return vec.fit_transform(texts)
def save_vectorized_text(text_corpus_path, vector_corpus_path):
with open(text_corpus_path, 'rb') as f:
text_lst = pickle.load(f)
text1 = np.asarray(text_lst[0])
text_lst_is_tokenized = bool(text1.ndim)
if not text_lst_is_tokenized:
tokenized_texts = []
for t_text in text_lst:
tokenized_texts.append(nltk.tokenize.word_tokenize(t_text, language='german'))
text_lst = tokenized_texts
del tokenized_texts
vectorized_text = get_trained_tfidf(text_lst)
with open(vector_corpus_path, 'wb') as f:
pickle.dump(vectorized_text, f)
print(f"saved {vector_corpus_path}")
# save metadata:
'''with open(text_corpus_path.replace('.pkl', '_meta.pkl'), 'rb') as f:
params = pickle.load(f)
metadata_text = prepro_params_2_string(params)
with open(vector_corpus_path.replace('.pkl', '_meta.txt'), 'w') as f:
f.write(metadata_text)'''
def main():
for i, text_corpus_path in enumerate(text_corpus_paths):
save_vectorized_text(text_corpus_path, vector_corpus_paths[i])
if __name__ == '__main__':
main()
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment