Loading DICOM_anonymizer.py 0 → 100644 +359 −0 Original line number Diff line number Diff line # #! /usr/bin/env python3 # Some references: # - pydicom anonymization tutorial: # https://pydicom.github.io/pydicom/stable/auto_examples/metadata_processing/plot_anonymize.html # - other script: # https://github.com/janpipek/anonymize_dicom import os import sys import traceback from PyQt5 import QtCore, QtWidgets import pydicom EXAMINATION_CODE = "G318_patient53_bodyPart2_visit0" TAGS_REMOVE = ( "PatientBirthDate", "InstitutionAddress", "OtherPatientIDs", "PerformingPhysicianName", "OperatorsName", "ReferringPhysicianName", "StudyDate", "AcquisitionDate", "ContentDate", "AcquisitionDateTime", "StudyTime", "ContentTime", "SeriesTime", "AcquisitionTime", "AccessionNumber", "StationName", "StudyID", "RequestingService", "RequestedProcedureDescription", # doplneno "PatientSex", #"InstanceCreatorUID", "StudyDescription", #"SeriesDescription", "InstitutionalDepartmentName", "PhysiciansOfRecord", "NameOfPhysicianReadingStudy", "AdmittingDiagnosesDescription", "DerivationDescription", "PatientBirthTime", "OtherPatientNames", "PatientAge", "PatientSize", "PatientWeight", "EthnicGroup", "Occupation", "AdditionalPatientHistory", "PatientComments", "DeviceSerialNumber", "ProtocolName", "ImageComments", #"RequestAttributesSequence" # doplneno po posouzeni vysledku "SeriesDate", "InstanceCreationDate", "InstanceCreationTime" ) TAGS_SUBSTITUTE = { "PatientName": EXAMINATION_CODE, "PatientID": EXAMINATION_CODE, "InstitutionName": "deleted", #"PatientName": "Patient_4", #"PatientID": "G318_patient4_bodyPart1_visit1", #"InstitutionName": "deleted", # doplneno #"StudyInstanceUID": "deleted", #"SeriesInstanceUID": "deleted", #"FrameOfReferenceUID": "deleted", #"SynchronizationFrameOfReferenceUID": "deleted", #"SOPInstanceUID": "deleted", #"ReferencedSOPInstanceUID": "deleted", #"UID": "deleted", #"ContentSequence": "deleted", # nefunguje na 99-PHOENIX, vyresit #"StorageMediaFileSetUID": "deleted", # doplneno, neznamy Type (1, 1C, 2, 2C, 3?) "ReferringPhysicianAddress": "deleted", "ReferringPhysicianTelephoneNumber": "deleted", "MedicalRecordLocator": "deleted", #"ReferencedFrameOfReferenceUID": "deleted", #"RelatedFrameOfReferenceUID": "deleted" # doplneno po posouzeni vysledku "RequestingPhysician": "deleted" } TAGS_KEEP = ( (0x0051, 0x1014), # ?? (RG 2021.05.24), flow direction (0x0018, 0x0094), # kvuli CSA hlavicce (0x0029, 0x1010), # kvuli CSA hlavicce (0x0029, 0x1020), # kvuli CSA hlavicce (0x0021, 0x1019), # kvuli CSA hlavicce (0x5200, 0x9229), # kvuli CSA hlavicce ) def is_dicom_file(path): """Fast way to check whether file is DICOM. :rtype: bool """ if not os.path.isfile(path): return False try: with open(path, "rb") as f: return f.read(132).decode("ASCII")[-4:] == "DICM" except: return False def dicom_files_in_dir(directory="."): """Full paths of all DICOM files in the directory. :rtype: list (str) """ directory = os.path.expanduser(directory) candidates = [os.path.join(directory, f) for f in sorted(os.listdir(directory))] return [f for f in candidates if is_dicom_file(f)] def anonymize(input_filename, output_filename, *, keep_private_tags=False): assert not os.path.exists(output_filename) ds = pydicom.dcmread(input_filename) def _delete_field(field): if field not in ds: return delattr(ds, field) def _update_field(field, value): if field not in ds: return setattr(ds, field, value) kept = {} for field in TAGS_KEEP: if field in ds: kept[field] = ds[field] for field in TAGS_REMOVE: _delete_field(field) for field, value in TAGS_SUBSTITUTE.items(): _update_field(field, value) # Private DICOM tags are not private metadata, see: # http://dicom.nema.org/dicom/2013/output/chtml/part05/sect_7.8.html if not keep_private_tags: ds.remove_private_tags() for field in TAGS_KEEP: if field in kept: ds[field] = kept[field] # create parent directory basedir = os.path.dirname(output_filename) os.makedirs(basedir, exist_ok=True) ds.save_as(output_filename) def anonymize_tree(input_directory, output_directory, *, keep_private_tags=False, update_signal=None): input_directory = os.path.realpath(input_directory) output_directory = os.path.realpath(output_directory) def anonymize_dir(directory): for file in dicom_files_in_dir(directory): output_file = os.path.join(output_directory, os.path.relpath(file, input_directory)) if update_signal is not None: update_signal.emit(os.path.relpath(output_file, start=output_directory)) anonymize(file, output_file, keep_private_tags=keep_private_tags) def walk_dir(directory): for entry in os.scandir(directory): if entry.is_dir(): path = os.path.join(directory, entry.name) anonymize_dir(path) walk_dir(path) anonymize_dir(input_directory) walk_dir(input_directory) class RunThread(QtCore.QThread): """ Runs a function in a thread and alerts the parent when done. Uses a pyqtSignal to alert the main thread of completion. """ finished = QtCore.pyqtSignal(str, name="finished") def __init__(self, func, on_finish, *args, **kwargs): super(RunThread, self).__init__() self.args = args self.kwargs = kwargs self.func = func self.finished.connect(on_finish) self.start() def run(self): try: result = self.func(*self.args, **self.kwargs) except Exception as e: traceback.print_exc() result = e finally: if result is None: self.finished.emit("") else: self.finished.emit(str(result)) class Anonymizer(QtWidgets.QMainWindow): progress_update = QtCore.pyqtSignal(str, name="progress_update") def __init__(self, path=None): super(Anonymizer, self).__init__() self.setWindowTitle("DICOM anonymizer") central = QtWidgets.QWidget() self.setCentralWidget(central) main_layout = QtWidgets.QVBoxLayout() central.setLayout(main_layout) input_widget = QtWidgets.QWidget() main_layout.addWidget(input_widget) input_layout = QtWidgets.QHBoxLayout() input_widget.setLayout(input_layout) input_label = QtWidgets.QLabel("Input:") input_layout.addWidget(input_label) self.input_line = QtWidgets.QLineEdit(path) input_layout.addWidget(self.input_line) input_button = QtWidgets.QPushButton("Open") input_button.clicked.connect(lambda: self.open_directory(which="input")) input_layout.addWidget(input_button) output_widget = QtWidgets.QWidget() main_layout.addWidget(output_widget) output_layout = QtWidgets.QHBoxLayout() output_widget.setLayout(output_layout) output_label = QtWidgets.QLabel("Output:") output_layout.addWidget(output_label) self.output_line = QtWidgets.QLineEdit() output_layout.addWidget(self.output_line) output_button = QtWidgets.QPushButton("Open") output_button.clicked.connect(lambda: self.open_directory(which="output")) output_layout.addWidget(output_button) anonymize_button = QtWidgets.QPushButton("Anonymize") anonymize_button.clicked.connect(self.anonymize) main_layout.addWidget(anonymize_button) self.status_label = QtWidgets.QLabel("") self.statusBar().addPermanentWidget(self.status_label) self._task = None self.progress_update.connect(self.on_progress_update) def open_directory(self, *, which): dialog = QtWidgets.QFileDialog(self) dialog.setFileMode(QtWidgets.QFileDialog.DirectoryOnly) dialog.setViewMode(QtWidgets.QFileDialog.List) dialog.setOption(QtWidgets.QFileDialog.ShowDirsOnly, True) if dialog.exec_(): directory = str(dialog.selectedFiles()[0]) if which == "input": self.input_line.setText(directory) elif which == "output": self.output_line.setText(directory) else: raise Exception("invalid which='{}'".format(which)) self.status_label.setText("") def anonymize(self): input_directory = self.input_line.text() if not os.path.isdir(input_directory): self.status_label.setText("Input directory is invalid.") return #######################x if not os.path.exists(input_directory): self.status_label.setText("Input directory is invalid.") return # check that it is a directory if not os.path.isdir(input_directory): self.status_label.setText("Input directory is invalid.") return # check that the directory is empty if not os.listdir(input_directory): self.status_label.setText("Input directory is empty.") return ###############xx output_directory = self.output_line.text() # create if it does not exist if not os.path.exists(output_directory): os.makedirs(output_directory) # check that it is a directory if not os.path.isdir(output_directory): self.status_label.setText("Output directory is invalid.") return # check that the directory is empty if os.listdir(output_directory): self.status_label.setText("Output directory is not empty.") return if os.path.realpath(output_directory) == os.path.realpath(input_directory): self.status_label.setText("The output directory cannot be the same as the input directory.") return if os.path.dirname(os.path.realpath(output_directory)).startswith(os.path.realpath(input_directory)): self.status_label.setText("The output directory cannot be a subdirectory of the input directory.") return # run asynchronous task self.task = RunThread(anonymize_tree, self.on_task_finished, input_directory, output_directory, update_signal=self.progress_update) def on_task_finished(self, result): self.status_label.setAlignment(QtCore.Qt.AlignLeft) if not result: self.status_label.setText("Anonymized!") else: self.status_label.setText("Error: {}".format(result)) self.task = None def on_progress_update(self, item): self.status_label.setAlignment(QtCore.Qt.AlignRight) self.status_label.setText(item) def main(): if len(sys.argv) < 2: path = "." else: path = sys.argv[1] if not os.path.isdir(path): path = "." path = os.path.abspath(path) app = QtWidgets.QApplication(sys.argv) QtCore.QCoreApplication.setApplicationName("DICOM anonymizer") QtCore.QCoreApplication.setOrganizationName("MMG FJFI") anonymizer = Anonymizer(path) anonymizer.show() sys.exit(app.exec_()) if __name__ == "__main__": main() Loading
DICOM_anonymizer.py 0 → 100644 +359 −0 Original line number Diff line number Diff line # #! /usr/bin/env python3 # Some references: # - pydicom anonymization tutorial: # https://pydicom.github.io/pydicom/stable/auto_examples/metadata_processing/plot_anonymize.html # - other script: # https://github.com/janpipek/anonymize_dicom import os import sys import traceback from PyQt5 import QtCore, QtWidgets import pydicom EXAMINATION_CODE = "G318_patient53_bodyPart2_visit0" TAGS_REMOVE = ( "PatientBirthDate", "InstitutionAddress", "OtherPatientIDs", "PerformingPhysicianName", "OperatorsName", "ReferringPhysicianName", "StudyDate", "AcquisitionDate", "ContentDate", "AcquisitionDateTime", "StudyTime", "ContentTime", "SeriesTime", "AcquisitionTime", "AccessionNumber", "StationName", "StudyID", "RequestingService", "RequestedProcedureDescription", # doplneno "PatientSex", #"InstanceCreatorUID", "StudyDescription", #"SeriesDescription", "InstitutionalDepartmentName", "PhysiciansOfRecord", "NameOfPhysicianReadingStudy", "AdmittingDiagnosesDescription", "DerivationDescription", "PatientBirthTime", "OtherPatientNames", "PatientAge", "PatientSize", "PatientWeight", "EthnicGroup", "Occupation", "AdditionalPatientHistory", "PatientComments", "DeviceSerialNumber", "ProtocolName", "ImageComments", #"RequestAttributesSequence" # doplneno po posouzeni vysledku "SeriesDate", "InstanceCreationDate", "InstanceCreationTime" ) TAGS_SUBSTITUTE = { "PatientName": EXAMINATION_CODE, "PatientID": EXAMINATION_CODE, "InstitutionName": "deleted", #"PatientName": "Patient_4", #"PatientID": "G318_patient4_bodyPart1_visit1", #"InstitutionName": "deleted", # doplneno #"StudyInstanceUID": "deleted", #"SeriesInstanceUID": "deleted", #"FrameOfReferenceUID": "deleted", #"SynchronizationFrameOfReferenceUID": "deleted", #"SOPInstanceUID": "deleted", #"ReferencedSOPInstanceUID": "deleted", #"UID": "deleted", #"ContentSequence": "deleted", # nefunguje na 99-PHOENIX, vyresit #"StorageMediaFileSetUID": "deleted", # doplneno, neznamy Type (1, 1C, 2, 2C, 3?) "ReferringPhysicianAddress": "deleted", "ReferringPhysicianTelephoneNumber": "deleted", "MedicalRecordLocator": "deleted", #"ReferencedFrameOfReferenceUID": "deleted", #"RelatedFrameOfReferenceUID": "deleted" # doplneno po posouzeni vysledku "RequestingPhysician": "deleted" } TAGS_KEEP = ( (0x0051, 0x1014), # ?? (RG 2021.05.24), flow direction (0x0018, 0x0094), # kvuli CSA hlavicce (0x0029, 0x1010), # kvuli CSA hlavicce (0x0029, 0x1020), # kvuli CSA hlavicce (0x0021, 0x1019), # kvuli CSA hlavicce (0x5200, 0x9229), # kvuli CSA hlavicce ) def is_dicom_file(path): """Fast way to check whether file is DICOM. :rtype: bool """ if not os.path.isfile(path): return False try: with open(path, "rb") as f: return f.read(132).decode("ASCII")[-4:] == "DICM" except: return False def dicom_files_in_dir(directory="."): """Full paths of all DICOM files in the directory. :rtype: list (str) """ directory = os.path.expanduser(directory) candidates = [os.path.join(directory, f) for f in sorted(os.listdir(directory))] return [f for f in candidates if is_dicom_file(f)] def anonymize(input_filename, output_filename, *, keep_private_tags=False): assert not os.path.exists(output_filename) ds = pydicom.dcmread(input_filename) def _delete_field(field): if field not in ds: return delattr(ds, field) def _update_field(field, value): if field not in ds: return setattr(ds, field, value) kept = {} for field in TAGS_KEEP: if field in ds: kept[field] = ds[field] for field in TAGS_REMOVE: _delete_field(field) for field, value in TAGS_SUBSTITUTE.items(): _update_field(field, value) # Private DICOM tags are not private metadata, see: # http://dicom.nema.org/dicom/2013/output/chtml/part05/sect_7.8.html if not keep_private_tags: ds.remove_private_tags() for field in TAGS_KEEP: if field in kept: ds[field] = kept[field] # create parent directory basedir = os.path.dirname(output_filename) os.makedirs(basedir, exist_ok=True) ds.save_as(output_filename) def anonymize_tree(input_directory, output_directory, *, keep_private_tags=False, update_signal=None): input_directory = os.path.realpath(input_directory) output_directory = os.path.realpath(output_directory) def anonymize_dir(directory): for file in dicom_files_in_dir(directory): output_file = os.path.join(output_directory, os.path.relpath(file, input_directory)) if update_signal is not None: update_signal.emit(os.path.relpath(output_file, start=output_directory)) anonymize(file, output_file, keep_private_tags=keep_private_tags) def walk_dir(directory): for entry in os.scandir(directory): if entry.is_dir(): path = os.path.join(directory, entry.name) anonymize_dir(path) walk_dir(path) anonymize_dir(input_directory) walk_dir(input_directory) class RunThread(QtCore.QThread): """ Runs a function in a thread and alerts the parent when done. Uses a pyqtSignal to alert the main thread of completion. """ finished = QtCore.pyqtSignal(str, name="finished") def __init__(self, func, on_finish, *args, **kwargs): super(RunThread, self).__init__() self.args = args self.kwargs = kwargs self.func = func self.finished.connect(on_finish) self.start() def run(self): try: result = self.func(*self.args, **self.kwargs) except Exception as e: traceback.print_exc() result = e finally: if result is None: self.finished.emit("") else: self.finished.emit(str(result)) class Anonymizer(QtWidgets.QMainWindow): progress_update = QtCore.pyqtSignal(str, name="progress_update") def __init__(self, path=None): super(Anonymizer, self).__init__() self.setWindowTitle("DICOM anonymizer") central = QtWidgets.QWidget() self.setCentralWidget(central) main_layout = QtWidgets.QVBoxLayout() central.setLayout(main_layout) input_widget = QtWidgets.QWidget() main_layout.addWidget(input_widget) input_layout = QtWidgets.QHBoxLayout() input_widget.setLayout(input_layout) input_label = QtWidgets.QLabel("Input:") input_layout.addWidget(input_label) self.input_line = QtWidgets.QLineEdit(path) input_layout.addWidget(self.input_line) input_button = QtWidgets.QPushButton("Open") input_button.clicked.connect(lambda: self.open_directory(which="input")) input_layout.addWidget(input_button) output_widget = QtWidgets.QWidget() main_layout.addWidget(output_widget) output_layout = QtWidgets.QHBoxLayout() output_widget.setLayout(output_layout) output_label = QtWidgets.QLabel("Output:") output_layout.addWidget(output_label) self.output_line = QtWidgets.QLineEdit() output_layout.addWidget(self.output_line) output_button = QtWidgets.QPushButton("Open") output_button.clicked.connect(lambda: self.open_directory(which="output")) output_layout.addWidget(output_button) anonymize_button = QtWidgets.QPushButton("Anonymize") anonymize_button.clicked.connect(self.anonymize) main_layout.addWidget(anonymize_button) self.status_label = QtWidgets.QLabel("") self.statusBar().addPermanentWidget(self.status_label) self._task = None self.progress_update.connect(self.on_progress_update) def open_directory(self, *, which): dialog = QtWidgets.QFileDialog(self) dialog.setFileMode(QtWidgets.QFileDialog.DirectoryOnly) dialog.setViewMode(QtWidgets.QFileDialog.List) dialog.setOption(QtWidgets.QFileDialog.ShowDirsOnly, True) if dialog.exec_(): directory = str(dialog.selectedFiles()[0]) if which == "input": self.input_line.setText(directory) elif which == "output": self.output_line.setText(directory) else: raise Exception("invalid which='{}'".format(which)) self.status_label.setText("") def anonymize(self): input_directory = self.input_line.text() if not os.path.isdir(input_directory): self.status_label.setText("Input directory is invalid.") return #######################x if not os.path.exists(input_directory): self.status_label.setText("Input directory is invalid.") return # check that it is a directory if not os.path.isdir(input_directory): self.status_label.setText("Input directory is invalid.") return # check that the directory is empty if not os.listdir(input_directory): self.status_label.setText("Input directory is empty.") return ###############xx output_directory = self.output_line.text() # create if it does not exist if not os.path.exists(output_directory): os.makedirs(output_directory) # check that it is a directory if not os.path.isdir(output_directory): self.status_label.setText("Output directory is invalid.") return # check that the directory is empty if os.listdir(output_directory): self.status_label.setText("Output directory is not empty.") return if os.path.realpath(output_directory) == os.path.realpath(input_directory): self.status_label.setText("The output directory cannot be the same as the input directory.") return if os.path.dirname(os.path.realpath(output_directory)).startswith(os.path.realpath(input_directory)): self.status_label.setText("The output directory cannot be a subdirectory of the input directory.") return # run asynchronous task self.task = RunThread(anonymize_tree, self.on_task_finished, input_directory, output_directory, update_signal=self.progress_update) def on_task_finished(self, result): self.status_label.setAlignment(QtCore.Qt.AlignLeft) if not result: self.status_label.setText("Anonymized!") else: self.status_label.setText("Error: {}".format(result)) self.task = None def on_progress_update(self, item): self.status_label.setAlignment(QtCore.Qt.AlignRight) self.status_label.setText(item) def main(): if len(sys.argv) < 2: path = "." else: path = sys.argv[1] if not os.path.isdir(path): path = "." path = os.path.abspath(path) app = QtWidgets.QApplication(sys.argv) QtCore.QCoreApplication.setApplicationName("DICOM anonymizer") QtCore.QCoreApplication.setOrganizationName("MMG FJFI") anonymizer = Anonymizer(path) anonymizer.show() sys.exit(app.exec_()) if __name__ == "__main__": main()