Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CU-2hz5ump deid mulitprocessing #393

Merged
merged 23 commits into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
a2a4985
CU-2hz5ump: Separate the text replacement in deid
mart-r Jan 24, 2024
27bd7a7
CU-2hz5ump: Fix some indentation on multiprocessing methods in CAT
mart-r Jan 26, 2024
3d45bd1
CU-2hz5ump: Add method to deid multithreaded
mart-r Jan 26, 2024
bc799ab
CU-2hz5ump: Add tests for deid multiprocessing
mart-r Jan 26, 2024
ea6a05c
CU-2hz5ump: Fix return type for multiprocessing deid method
mart-r Jan 26, 2024
7499d38
CU-2hz5ump: Remove unused import
mart-r Jan 26, 2024
164e479
CU-2hz5ump: Fix typing issue within deid multi texts method
mart-r Jan 26, 2024
5c049d0
CU-2hz5ump: Add removal parts to deid tests
mart-r Feb 1, 2024
37c2257
CU-2hz5ump: Add error handling with message to deid multiprocessing i…
mart-r Feb 1, 2024
73a9ac4
CU-2hz5ump: Unpin mypy for dev requirements
mart-r Feb 1, 2024
d360aec
CU-2hz5ump: Fix mypy unpin typo
mart-r Feb 1, 2024
c7e8a00
CU-2hz5ump: Force later version of mypy
mart-r Feb 1, 2024
6f2aca9
CU-2hz5ump: Force mypy extensions to newer version
mart-r Feb 1, 2024
5d1b2e2
CU-2hz5ump: Add 20 minute timeout to main workflow
mart-r Feb 2, 2024
78b9145
CU-2hz5ump: Add 20 minute timeout to main workflow (build)
mart-r Feb 5, 2024
39a76b8
CU-2hz5ump: Add 19 minute timeout to tests step of main workflow
mart-r Feb 5, 2024
456f1bb
CU-2hz5ump: Move to a 17 minute timeout to tests step of main workflow
mart-r Feb 5, 2024
5e22334
CU-2hz5ump: Add a 10 minute timeout for multiprocessing DeID tests
mart-r Feb 6, 2024
c2a54e4
Revert "CU-2hz5ump: Add a 10 minute timeout for multiprocessing DeID …
mart-r Feb 7, 2024
8a9360a
CU-2hz5ump: Add a 3 minute timeout (through a decorator) to multiproc…
mart-r Feb 7, 2024
c64a924
CU-2hz5ump: Remove overly strict DeID test
mart-r Feb 7, 2024
3b887ef
CU-2hz5ump: Add condition for number of results for multiprocessing D…
mart-r Feb 8, 2024
60a0a20
Merge branch 'master' into CU-2hz5ump-deid-mulitprocessing
mart-r Feb 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ jobs:
flake8 medcat
- name: Test
run: |
python -m unittest discover
timeout 17m python -m unittest discover
continue-on-error: true

publish-to-test-pypi:

Expand All @@ -43,6 +44,7 @@ jobs:
github.event_name == 'push' &&
startsWith(github.ref, 'refs/tags') != true
runs-on: ubuntu-20.04
timeout-minutes: 20
concurrency: publish-to-test-pypi
needs: [build]

Expand Down
43 changes: 26 additions & 17 deletions medcat/cat.py
Original file line number Diff line number Diff line change
Expand Up @@ -1005,11 +1005,11 @@ def get_entities(self,
return out

def get_entities_multi_texts(self,
texts: Union[Iterable[str], Iterable[Tuple]],
only_cui: bool = False,
addl_info: List[str] = ['cui2icd10', 'cui2ontologies', 'cui2snomed'],
n_process: Optional[int] = None,
batch_size: Optional[int] = None) -> List[Dict]:
texts: Union[Iterable[str], Iterable[Tuple]],
only_cui: bool = False,
addl_info: List[str] = ['cui2icd10', 'cui2ontologies', 'cui2snomed'],
n_process: Optional[int] = None,
batch_size: Optional[int] = None) -> List[Dict]:
"""Get entities

Args:
Expand Down Expand Up @@ -1053,6 +1053,15 @@ def get_entities_multi_texts(self,
for o in out:
if o is not None:
o.pop('text', None)
except RuntimeError as e:
if e.args == ('_share_filename_: only available on CPU',):
raise ValueError("Issue while performing multiprocessing. "
"This is mostly likely to happen when "
"using NER models (i.e DeId). If that is "
"the case you could either a) save the "
"model on disk and then load it back up; "
"or b) install cpu-only toch.") from e
raise e
finally:
self.pipe.reset_error_handler()

Expand Down Expand Up @@ -1375,20 +1384,20 @@ def multiprocessing_pipe(self, in_data: Union[List[Tuple], Iterable[Tuple]],
return_dict: bool = True,
batch_factor: int = 2) -> Union[List[Tuple], Dict]:
return self.multiprocessing_batch_docs_size(in_data=in_data, nproc=nproc,
batch_size=batch_size,
only_cui=only_cui,
addl_info=addl_info,
return_dict=return_dict,
batch_factor=batch_factor)
batch_size=batch_size,
only_cui=only_cui,
addl_info=addl_info,
return_dict=return_dict,
batch_factor=batch_factor)

def multiprocessing_batch_docs_size(self,
in_data: Union[List[Tuple], Iterable[Tuple]],
nproc: Optional[int] = None,
batch_size: Optional[int] = None,
only_cui: bool = False,
addl_info: List[str] = ['cui2icd10', 'cui2ontologies', 'cui2snomed'],
return_dict: bool = True,
batch_factor: int = 2) -> Union[List[Tuple], Dict]:
in_data: Union[List[Tuple], Iterable[Tuple]],
nproc: Optional[int] = None,
batch_size: Optional[int] = None,
only_cui: bool = False,
addl_info: List[str] = ['cui2icd10', 'cui2ontologies', 'cui2snomed'],
return_dict: bool = True,
batch_factor: int = 2) -> Union[List[Tuple], Dict]:
"""Run multiprocessing NOT FOR TRAINING.

This method batches the data based on the number of documents as specified by the user.
Expand Down
39 changes: 37 additions & 2 deletions medcat/utils/ner/deid.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@
- config
- cdb
"""
from typing import Union, Tuple, Any
from typing import Union, Tuple, Any, List, Iterable, Optional

from medcat.cat import CAT
from medcat.utils.ner.model import NerModel

from medcat.utils.ner.helpers import _deid_text as deid_text
from medcat.utils.ner.helpers import _deid_text as deid_text, replace_entities_in_text


class DeIdModel(NerModel):
Expand Down Expand Up @@ -72,8 +72,43 @@ def deid_text(self, text: str, redact: bool = False) -> str:
Returns:
str: The deidentified text.
"""
self.cat.get_entities
return deid_text(self.cat, text, redact=redact)

def deid_multi_texts(self,
texts: Union[Iterable[str], Iterable[Tuple]],
redact: bool = False,
addl_info: List[str] = ['cui2icd10', 'cui2ontologies', 'cui2snomed'],
n_process: Optional[int] = None,
batch_size: Optional[int] = None) -> List[str]:
"""Deidentify text on multiple branches

Args:
texts (Union[Iterable[str], Iterable[Tuple]]): Text to be annotated
redact (bool): Whether to redact the information.
addl_info (List[str], optional): Additional info. Defaults to ['cui2icd10', 'cui2ontologies', 'cui2snomed'].
n_process (Optional[int], optional): Number of processes. Defaults to None.
batch_size (Optional[int], optional): The size of a batch. Defaults to None.

Returns:
List[str]: List of deidentified documents.
"""
entities = self.cat.get_entities_multi_texts(texts, addl_info=addl_info,
n_process=n_process, batch_size=batch_size)
out = []
for raw_text, _ents in zip(texts, entities):
ents = _ents['entities']
text: str
if isinstance(raw_text, tuple):
text = raw_text[1]
elif isinstance(raw_text, str):
text = raw_text
else:
raise ValueError(f"Unknown raw text: {type(raw_text)}: {raw_text}")
new_text = replace_entities_in_text(text, ents, get_cui_name=self.cat.cdb.get_name, redact=redact)
out.append(new_text)
return out

@classmethod
def load_model_pack(cls, model_pack_path: str) -> 'DeIdModel':
"""Load DeId model from model pack.
Expand Down
13 changes: 11 additions & 2 deletions medcat/utils/ner/helpers.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Callable, Dict

from medcat.utils.data_utils import count_annotations
from medcat.cdb import CDB

Expand Down Expand Up @@ -27,11 +29,18 @@ def _deid_text(cat, text: str, redact: bool = False) -> str:
Returns:
str: The de-identified document.
"""
new_text = str(text)
entities = cat.get_entities(text)['entities']
return replace_entities_in_text(text, entities, cat.cdb.get_name, redact=redact)


def replace_entities_in_text(text: str,
entities: Dict,
get_cui_name: Callable[[str], str],
redact: bool = False) -> str:
new_text = str(text)
for ent in sorted(entities.values(), key=lambda ent: ent['start'], reverse=True):
r = "*"*(ent['end']-ent['start']
) if redact else cat.cdb.get_name(ent['cui'])
) if redact else get_cui_name(ent['cui'])
new_text = new_text[:ent['start']] + f'[{r}]' + new_text[ent['end']:]
return new_text

Expand Down
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ mypy-extensions>=1.0.0
types-aiofiles==0.8.3
types-PyYAML==6.0.3
types-setuptools==57.4.10
timeout-decorator==0.5.0
1 change: 1 addition & 0 deletions tests/resources/deid_test_data.json

Large diffs are not rendered by default.

70 changes: 69 additions & 1 deletion tests/utils/ner/test_deid.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@

from medcat.ner import transformers_ner

from spacy.tokens import Doc
from spacy.tokens import Doc, Span

from typing import Any, List, Tuple
import os
import json
import tempfile

import unittest
import timeout_decorator

FILE_DIR = os.path.dirname(os.path.realpath(__file__))

Expand All @@ -20,6 +23,9 @@
TRAIN_DATA = os.path.join(FILE_DIR, "..", "..",
"resources", "deid_train_data.json")

TEST_DATA = os.path.join(FILE_DIR, "..", "..",
"resources", "deid_test_data.json")


class DeIDmodelCreationTests(unittest.TestCase):

Expand Down Expand Up @@ -57,6 +63,17 @@ def train_model_once(model: deid.DeIdModel,
) -> Tuple[Tuple[Any, Any, Any], deid.DeIdModel]:
if not _trained:
retval = model.train(TRAIN_DATA)
# mpp = 'temp/deid_multiprocess/dumps/temp_model_save'
# NOTE: it seems that after training the model leaves
# it in a state where it can no longer be used
# for multiprocessing. So in order to avoid that
# we save the model on disk and load it agains
with tempfile.TemporaryDirectory() as dir_name:
print("Saving model on disk")
mpn = model.cat.create_model_pack(dir_name)
print("Loading model")
model = deid.DeIdModel.load_model_pack(os.path.join(dir_name, mpn))
print("Loaded model off disk")
_trained.append((retval, model))
return _trained[0]

Expand Down Expand Up @@ -105,7 +122,10 @@ def setUpClass(cls) -> None:
def test_model_works_deid_text(self):
anon_text = self.deid_model.deid_text(input_text)
self.assertIn("[DOCTOR]", anon_text)
self.assertNotIn("M. Sully", anon_text)
self.assertIn("[HOSPITAL]", anon_text)
# self.assertNotIn("Dublin", anon_text)
self.assertNotIn("7 Eccles Street", anon_text)

def test_model_works_dunder_call(self):
anon_doc = self.deid_model(input_text)
Expand All @@ -115,4 +135,52 @@ def test_model_works_deid_text_redact(self):
anon_text = self.deid_model.deid_text(input_text, redact=True)
self.assertIn("****", anon_text)
self.assertNotIn("[DOCTOR]", anon_text)
self.assertNotIn("M. Sully", anon_text)
self.assertNotIn("[HOSPITAL]", anon_text)
# self.assertNotIn("Dublin", anon_text)
self.assertNotIn("7 Eccles Street", anon_text)

class DeIDModelMultiprocessingWorks(unittest.TestCase):
processes = 2

@classmethod
def setUpClass(cls) -> None:
Span.set_extension('link_candidates', default=None, force=True)
_add_model(cls)
cls.deid_model = train_model_once(cls.deid_model)[1]
with open(TEST_DATA) as f:
raw_data = json.load(f)
cls.data = []
for project in raw_data['projects']:
for doc in project['documents']:
cls.data.append((f"{project['name']}_{doc['name']}", doc['text']))

def assertTextHasBeenDeIded(self, text: str, redacted: bool):
if not redacted:
for cui in self.deid_model.cdb.cui2names:
cui_name = self.deid_model.cdb.get_name(cui)
if cui_name in text:
# all good
return
else:
# if redacted, only check once...
if "******" in text:
# all good
return
raise AssertionError("None of the CUIs found")

@timeout_decorator.timeout(3 * 60) # 3 minutes max
def test_model_can_multiprocess_no_redact(self):
processed = self.deid_model.deid_multi_texts(self.data, n_process=self.processes)
self.assertEqual(len(processed), 5)
for tid, new_text in enumerate(processed):
with self.subTest(str(tid)):
self.assertTextHasBeenDeIded(new_text, redacted=False)

@timeout_decorator.timeout(3 * 60) # 3 minutes max
def test_model_can_multiprocess_redact(self):
processed = self.deid_model.deid_multi_texts(self.data, n_process=self.processes, redact=True)
self.assertEqual(len(processed), 5)
for tid, new_text in enumerate(processed):
with self.subTest(str(tid)):
self.assertTextHasBeenDeIded(new_text, redacted=True)
Loading