File size: 18,395 Bytes
3d90a2e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
# Importing necessary libraries
import io
import os
import cv2
import utils
import random
import zipfile
import numpy as np
import pandas as pd
from PIL import Image
import streamlit as st
import albumentations as A


# Function to check if the uploaded images and labels are valid
st.cache_resource(show_spinner=False)


def check_valid_labels(uploaded_files):
    # Early exit if no files are uploaded
    if len(uploaded_files) == 0:
        st.warning(
            "Please upload at least one image to apply image processing.", icon="⚠️"
        )
        return False, {}, {}, None, None

    # Initialize dictionaries to hold images and labels
    image_files, label_files = {}, {}

    # Extracting the name of the first file
    first_file_name = os.path.splitext(uploaded_files[0].name)[0]

    # Counters for images and labels
    image_count, label_count = 0, 0

    # Initialize a progress bar and progress text
    progress_bar = st.progress(0)
    progress_text = st.empty()
    total_files = len(uploaded_files)

    # Categorize and prepare uploaded files
    for index, file in enumerate(uploaded_files):
        file.seek(0)  # Reset file pointer to ensure proper file reading
        file_name_without_extension = os.path.splitext(file.name)[0]

        # Distribute files into image or label categories based on their file type
        if file.type in ["image/jpeg", "image/png"]:
            image_files[file_name_without_extension] = file
            image_count += 1
        elif file.type == "text/plain":
            label_files[file_name_without_extension] = file
            label_count += 1

        # Update progress bar and display current progress
        progress_percentage = (index + 1) / total_files
        progress_bar.progress(progress_percentage)
        progress_text.text(f"Validating file {index + 1} of {total_files}")

    # Extract sets of unique file names for images and labels
    unique_image_names = set(image_files.keys())
    unique_label_names = set(label_files.keys())

    # Remove progress bar and progress text after processing
    progress_bar.empty()
    progress_text.empty()

    if (len(unique_image_names) != image_count) or (
        len(unique_label_names) != label_count
    ):
        # Warn the user about the presence of duplicate file names
        st.warning(
            "Duplicate file names detected. Please ensure each image and label has a unique name.",
            icon="⚠️",
        )
        return False, {}, {}, None, None

    # Perform validation checks
    if (len(image_files) > 0) and (len(label_files) > 0):
        # Check if the number of images and labels match and each pair has corresponding files
        if (len(image_files) == len(label_files)) and (
            unique_image_names == unique_label_names
        ):
            st.info(
                f"Validated: {len(image_files)} images and labels successfully matched.",
                icon="✅",
            )
            return (
                True,
                image_files,
                label_files,
                image_files[first_file_name],
                label_files[first_file_name],
            )

        elif len(image_files) != len(label_files):
            # Warn if the count of images and labels does not match
            st.warning(
                "Count Mismatch: The number of uploaded images and labels does not match.",
                icon="⚠️",
            )
            return False, {}, {}, None, None

        else:
            # Warn if there is a mismatch in file names between images and labels
            st.warning(
                "Mismatch detected: Some images do not have corresponding label files.",
                icon="⚠️",
            )
            return False, {}, {}, None, None

    elif len(image_files) > 0:
        # Inform the user if only images are uploaded without labels
        st.info(
            f"Note: {len(image_files)} images uploaded without labels. Label type and class labels will be ignored in this case.",
            icon="✅",
        )
        return True, image_files, {}, image_files[first_file_name], None

    else:
        # Warn if no images are uploaded
        st.warning("Please upload an image to apply image processing.", icon="⚠️")
        return False, {}, {}, None, None


# Function to apply an image processing technique to an image and return any errors along with the processed image
def apply_and_test_image_processing(

    image_processing, params, image, allowed_image_types

):
    try:
        # Check the data type and number of channels of the input image
        input_image_type = image.dtype
        num_channels = (
            image.shape[2] if len(image.shape) == 3 else 1
        )  # Assuming 1 for single-channel images

        # Validate if the input image type is among the allowed types
        if not utils.is_image_type_allowed(
            input_image_type, num_channels, allowed_image_types
        ):
            # Format the allowed types for display in the warning message
            allowed_types_formatted = ", ".join(map(str, allowed_image_types))

            # Display a warning message specifying the acceptable image types
            st.warning(
                f"Error applying {image_processing}: Incompatible image type. The input image should be one of the following types: {allowed_types_formatted}",
                icon="⚠️",
            )
            return True, None  # Error occurred

        # Set the seed for reproducibility using iteration number
        random.seed(0)

        # Apply image processing technique
        transform = A.Compose([utils.apply_albumentation(params, image_processing)])
        processed_image = transform(image=image)["image"]

        return False, processed_image  # No error
    except Exception as e:
        st.warning(f"Error applying {image_processing}: {e}", icon="⚠️")
        return True, None  # Error occurred


# Function to generates a DataFrame detailing image processing technique parameters and descriptions
def create_image_processings_dataframe(

    image_processings_params, image_processing_params_db

):
    data = []
    for aug_name, params in image_processings_params.items():
        for param_name, param_value in params.items():
            # Retrieve relevant image_processing information from the database
            image_processing_info = image_processing_params_db[
                image_processing_params_db["Name"] == aug_name
            ]
            param_info = image_processing_info[
                image_processing_info["Parameter Name"] == param_name
            ]

            # Check if the parameter information exists in the database
            if not param_info.empty:
                # Get the description of the current parameter
                param_description = param_info["Parameter Description"].iloc[0]
            else:
                param_description = "Description not available"

            # Append image_processing name, parameter name, its value, and description to the data list
            data.append([aug_name, param_name, param_value, param_description])

    # Create the DataFrame from the accumulated data
    image_processings_df = pd.DataFrame(
        data, columns=["image_processing", "Parameter", "Value", "Description"]
    )
    return image_processings_df


# Function to generate python code for images and labels
def generate_python_code_images_labels(

    augmentations_params,

    num_variations=1,

    include_original=False,

):
    # Start with necessary library imports
    code_str = "# Importing necessary libraries\n"
    code_str += "import os\nimport cv2\nimport shutil\nimport albumentations as A\n\n"

    # Paths for input and output directories
    code_str += "# Define the paths for input and output directories\n"
    code_str += "input_directory = 'path/to/input'\n"
    code_str += "output_directory = 'path/to/output'\n\n"

    # Function to create an augmentation pipeline
    code_str += "# Function to create an augmentation pipeline using Albumentations\n"
    code_str += "def process_image(image):\n"
    code_str += "    # Define the sequence of augmentation techniques\n"
    code_str += "    pipeline = A.Compose([\n"
    for technique, params in augmentations_params.items():
        code_str += f"        A.{technique}({', '.join(f'{k}={v}' for k, v in params.items())}),\n"
    code_str += "    ])\n"
    code_str += "    # Apply the augmentation pipeline\n"
    code_str += "    return pipeline(image=image)['image']\n\n"

    # Function to process a batch of images
    code_str += "# Function to process a batch of images\n"
    code_str += "def process_batch(input_directory, output_directory):\n"
    code_str += "    for filename in os.listdir(input_directory):\n"
    code_str += "        if filename.lower().endswith(('.png', '.jpg', '.jpeg')):\n"
    code_str += "            image_path = os.path.join(input_directory, filename)\n"
    code_str += "            label_path = os.path.splitext(image_path)[0] + '.txt'\n\n"

    code_str += "            # Read the image\n"
    code_str += "            image = cv2.imread(image_path)\n\n"

    # Include original image and label logic
    if include_original:
        code_str += "            # Include original image and label\n"
        code_str += "            shutil.copy2(image_path, output_directory)\n"
        code_str += "            shutil.copy2(label_path, output_directory)\n\n"

    # Generate variations for each image and process them
    code_str += "            # Generate variations for each image and process them\n"
    code_str += f"            for variation in range({num_variations}):\n"
    code_str += "                processed_image = process_image(image)\n\n"
    code_str += "                # Save the processed image\n"
    code_str += "                output_filename = f'processed_{os.path.splitext(filename)[0]}_{variation}{os.path.splitext(filename)[1]}'\n"
    code_str += "                cv2.imwrite(os.path.join(output_directory, output_filename), processed_image)\n\n"
    code_str += (
        "                # Save the original label file for the processed image\n"
    )
    code_str += "                if os.path.exists(label_path):\n"
    code_str += "                    shutil.copy2(label_path, os.path.join(output_directory, os.path.splitext(output_filename)[0] + '.txt'))\n\n"

    # Execute the batch processing function
    code_str += (
        "# Execute the batch processing function with the specified parameters\n"
    )
    code_str += "process_batch(input_directory, output_directory)\n"

    return code_str


# Function to create an image processing pipeline based on the selected techniques and their parameters
def create_image_processing_pipeline(

    selected_image_processings, image_processing_params

):
    pipeline = []
    for aug_name in selected_image_processings:
        # Append the function call with its parameters to the pipeline
        pipeline.append(
            utils.apply_albumentation(image_processing_params[aug_name], aug_name)
        )

    # Compose all the image processings into one transformation
    return A.Compose(pipeline)


# Function to process images and labels, apply image processing techniques, and create a zip file with the results
@st.cache_resource(show_spinner=False)
def process_images_and_labels(

    image_files,

    label_files,

    selected_image_processings,

    _image_processings_params,

    num_variations,

    include_original,

):
    zip_buffer = io.BytesIO()  # Create an in-memory buffer for the zip file
    st.session_state[
        "image_repository_preprocessing"
    ] = {}  # Initialize a repository to store processed image data
    st.session_state[
        "processed_image_mapping_procesing"
    ] = {}  # Map original images to their processed versions
    st.session_state["unique_images_names"] = []  # List to store unique images names

    # Create progress bar and text elements in Streamlit
    progress_bar = st.progress(0)
    progress_text = st.empty()

    with zipfile.ZipFile(
        zip_buffer, mode="a", compression=zipfile.ZIP_DEFLATED, allowZip64=True
    ) as zip_file:
        # Compose all the image processings into one transformation
        transform = create_image_processing_pipeline(
            selected_image_processings, _image_processings_params
        )

        total_images = len(image_files) * num_variations
        processed_count = 0  # Counter for processed images

        # Iterate over each uploaded file
        for image_name, image_file in image_files.items():
            image_file.seek(0)  # Reset file pointer to start
            file_bytes = np.asarray(bytearray(image_file.read()), dtype=np.uint8)
            original_image = cv2.cvtColor(
                cv2.imdecode(file_bytes, cv2.IMREAD_COLOR), cv2.COLOR_BGR2RGB
            )
            original_image_resized = utils.resize_image(original_image)

            # Include original images and labels in the output if selected
            if include_original:
                original_img_buffer = io.BytesIO()
                Image.fromarray(original_image).save(original_img_buffer, format="JPEG")
                zip_file.writestr(image_file.name, original_img_buffer.getvalue())

                # Save corresponding label file to zip if it exists
                label_file = label_files.get(image_name)
                if label_file is not None:
                    label_file.seek(0)  # Reset the file pointer
                    zip_file.writestr(f"{image_name}.txt", label_file.read())

            original_file_name = image_file.name
            st.session_state["unique_images_names"].append(original_file_name)
            st.session_state["processed_image_mapping_procesing"][
                original_file_name
            ] = []
            st.session_state["image_repository_preprocessing"][image_file.name] = {
                "image": original_image_resized,
                "label": label_files.get(image_name),
            }

            # Apply image processing techniques and generate variations
            for i in range(num_variations):
                random.seed(i)

                # Apply the image processing pipeline to the image
                processed_image = transform(image=original_image)["image"]

                img_buffer = io.BytesIO()
                Image.fromarray(processed_image).save(img_buffer, format="JPEG")
                processed_filename = f"processed_{image_name.split('.')[0]}_{i}.jpg"
                zip_file.writestr(processed_filename, img_buffer.getvalue())
                processed_image_resized = utils.resize_image(processed_image)

                # Save corresponding label file to zip if it exists
                label_file = label_files.get(image_name)
                if label_file is not None:
                    label_file.seek(0)  # Reset the file pointer
                    zip_file.writestr(
                        f"processed_{image_name}_{i}.txt", label_file.read()
                    )

                st.session_state["processed_image_mapping_procesing"][
                    image_file.name
                ].append(processed_filename)
                st.session_state["image_repository_preprocessing"][
                    processed_filename
                ] = {
                    "image": processed_image_resized,
                    "label": label_file,
                }

                processed_count += 1
                # Update progress bar and text
                progress_bar.progress(processed_count / total_images)
                progress_text.text(
                    f"Processing image {processed_count} of {total_images}"
                )

    # Remove the progress bar and text after processing is complete
    progress_bar.empty()
    progress_text.empty()

    zip_buffer.seek(0)  # Reset buffer to start for download

    st.session_state["zip_data_processing"] = zip_buffer.getvalue()


# Function to generate a downloadable file
def display_code_and_download_button(generated_code):
    def generate_downloadable_file(code_str):
        return code_str.encode("utf-8")

    # Display the generated code in Streamlit with description and download button in columns
    with st.expander("Plug and Play Code"):
        col1, col2 = st.columns([7, 3])

        with col1:
            st.markdown(
                """

            ### Description of the Code Pipeline

            """
            )

        st.markdown(
            """

            <div style='text-align: justify;'>

            This code is a ready-to-use Python script for batch augmentation. It applies selected augmentation techniques to all images in a specified input directory and saves the processed images in an output directory.



            **To use this script:**

            - Ensure you have the necessary dependencies installed.

            - Specify the input and output paths: Replace `'path/to/input'` with the path to your input images and `'path/to/output'` with the desired path for the processed images.

            - The number of processed variations per image, the inclusion of the original images in the output, and the processing techniques with their parameters will be automatically set based on your selections.



            ### Python Code

            </div>

            """,
            unsafe_allow_html=True,
        )

        # Display python code
        st.code(generated_code, language="python")

        with col2:
            # Create a button for downloading the Python file
            st.download_button(
                label="Download Python File",
                data=generate_downloadable_file(generated_code),
                file_name="image_processing_script.py",
                mime="text/plain",
                use_container_width=True,
            )