Tensorflow Tutorial: Load Custom Image Dataset
Introduction
In this tutorial, we will step through the process of loading the FloodNet dataset in Python Tensorflow.
First, install tensorflow via pip:
pip install tensorflow
Next, we import the required libraries:
import tensorflow as tf
import os
import csv
Additionally, we create the configuration dictionary with certain properties which we use in the actual data loading process:
config = {
"force_load": False,
"train_response_path": "./path/to/data/train_response.csv",
"val_response_path": "./path/to/data/val_response.csv",
"test_response_path": "./path/to/data/test_response.csv",
"flooded_classes": tf.constant([
1, # Building-flooded
3, # Road-flooded
5, # Water
], dtype=tf.uint32),
"flooded_threshold": 1/4,
}
Data Loading
Let us start by loading the data into a tensorflow dataset. In this tutorial, we work with the FloodNet-Supervised dataset version 1.0 from BinaLab on Github. First of all, we download the respective zip files and extract them to the desired data location. By doing so, we obtain a folder train with 1445 training instances, a folder val with 450 validation instances, and a folder test with 448 instances for testing. Each of these folders is further divided into two directories: one of them contains the actual images and the other one contains the corresponding labels which are again images types but the individual pixels indicate the semantic segmentation labels. These labels include: (1) Background, (2) Building Flooded, (3) Building Non-Flooded, (4) Road Flooded, (5) Road Non-Flooded, (6) Water, (7)Tree, (8) Vehicle, (9) Pool, and (10) Grass.
However, since we would like to deal with a binary classification task rather than a more complex image segmentation task, we additionally transform these label images to a simple binary response. To achieve that, we construct a binary indicator to indicate whether a certain percentage (e.g., threshold of 25%) of the pixels is flooded. Thereby, we reduce the two-dimensional segmentation labels to a simple binary response. Note that we consider three of the 10 labels as flooded: Building Flooded, Road Flooded, and Water. Also note that we treat the segmentation labels as ground truth, meaning that we round the decimal label indicators to the nearest actual label.
Altogether, we implement the following function to load the train, validation, and test label images and transform them to the desired binary response. In addition to the transformation, we store the resulting binary responses with the corresponding image filenames as a csv file to disk so that we do not have to compute the binary response each time from scratch.
def loadResponse(config):
def isFlooded(img):
counts = tf.map_fn(
fn = lambda fc: tf.reduce_sum(tf.cast(tf.equal(tf.cast(
tf.round(tf.reshape(img, [-1])), tf.uint32), fc), tf.uint32)),
elems = config["flooded_classes"])
flooded_count = tf.reduce_sum(counts)
# return True if the count of flooded pixels exceeds the specified threshold
return tf.greater(flooded_count, tf.cast(tf.multiply(tf.cast(tf.size(img), tf.float32),
config["flooded_threshold"]), tf.uint32))
# read the response label images from disk
train_labels_dir = "./data/FloodNet-Supervised_v1.0/train/train-label-img/"
val_labels_dir = "./data/FloodNet-Supervised_v1.0/val/val-label-img/"
test_labels_dir = "./data/FloodNet-Supervised_v1.0/test/test-label-img/"
train_labels = tf.keras.utils.image_dataset_from_directory(
directory = train_labels_dir,
label_mode = None,
batch_size = None,
image_size = (3000, 3000),
shuffle = False,
color_mode = "grayscale")
val_labels = tf.keras.utils.image_dataset_from_directory(
directory = val_labels_dir,
label_mode = None,
batch_size = None,
image_size = (3000, 3000),
shuffle = False,
color_mode = "grayscale")
test_labels = tf.keras.utils.image_dataset_from_directory(
directory = test_labels_dir,
label_mode = None,
batch_size = None,
image_size = (3000, 3000),
shuffle = False,
color_mode = "grayscale")
train_fnames = map(lambda fp: os.path.basename(fp).split("_")[0],
train_labels.file_paths)
val_fnames = map(lambda fp: os.path.basename(fp).split("_")[0],
val_labels.file_paths)
test_fnames = map(lambda fp: os.path.basename(fp).split("_")[0],
test_labels.file_paths)
# create the response (True/False) from the label images
train_response = train_labels.map(isFlooded)
val_response = val_labels.map(isFlooded)
test_response = test_labels.map(isFlooded)
train_response = map(lambda tb: tb.numpy(), train_labels.map(isFlooded))
val_response = map(lambda tb: tb.numpy(), val_labels.map(isFlooded))
test_response = map(lambda tb: tb.numpy(), test_labels.map(isFlooded))
train_response = dict(zip(train_fnames, train_response))
val_response = dict(zip(val_fnames, val_response))
test_response = dict(zip(test_fnames, test_response))
# write the computed binary responses to disk for later usage
with open(config["train_response_path"], "w") as csvfile:
writer = csv.writer(csvfile)
writer.writerow(["fname", "response"])
for key, val in train_response.items():
writer.writerow([key, val])
with open(config["val_response_path"], "w") as csvfile:
writer = csv.writer(csvfile)
writer.writerow(["fname", "response"])
for key, val in val_response.items():
writer.writerow([key, val])
with open(config["test_response_path"], "w") as csvfile:
writer = csv.writer(csvfile)
writer.writerow(["fname", "response"])
for key, val in test_response.items():
writer.writerow([key, val])
print(f'Wrote {len(train_response)} training labels, {len(val_response)} '
+ f'validation labels, and {len(test_response)} test labels to disk.')
return train_response, val_response, test_response
Since we store the binary response to disk, we can avoid redundant transformations of the label images by reading the binary responses directly from disk instead of re-computing them from the label images. Therefore, we implement the following function to obtain the already computed binary responses from the stored csv file:
def readResponse(config):
train_response = dict()
with open(config["train_response_path"], "r") as csvfile:
reader = csv.reader(csvfile)
next(reader) # omit the header
for row in reader:
train_response[row[0]] = (row[1] == "True")
val_response = dict()
with open(config["val_response_path"], "r") as csvfile:
reader = csv.reader(csvfile)
next(reader) # omit the header
for row in reader:
val_response[row[0]] = (row[1] == "True")
test_response = dict()
with open(config["test_response_path"], "r") as csvfile:
reader = csv.reader(csvfile)
next(reader) # omit the header
for row in reader:
test_response[row[0]] = (row[1] == "True")
print(f'Read {len(train_response)} training labels, {len(val_response)} '
+ f'validation labels, and {len(test_response)} test labels from disk.')
return train_response, val_response, test_response
Using these two methods, we can either load and compute our binary response labels, or read the pre-computed labels from disk. Hence, we can now focus on loading the actual image data and constructing a dataset that contains both the image with the corresponding binary label. In order to do so, we perform the following steps:
- Load the images from disk (similar to loading the label images before)
- Obtain the response
- Match the images to the corresponding labels using their file names
- Zip the two datasets into a single dataset of pairs
def getDataset(config):
train_dir = "./data/FloodNet-Supervised_v1.0/train/train-org-img/"
val_dir = "./data/FloodNet-Supervised_v1.0/val/val-org-img/"
test_dir = "./data/FloodNet-Supervised_v1.0/test/test-org-img/"
# load the image dataset
train = tf.keras.utils.image_dataset_from_directory(
directory = train_dir,
label_mode = None,
batch_size = None,
image_size = (3000, 3000),
shuffle = False,
color_mode = "rgb")
val = tf.keras.utils.image_dataset_from_directory(
directory = val_dir,
label_mode = None,
batch_size = None,
image_size = (3000, 3000),
shuffle = False,
color_mode = "rgb")
test = tf.keras.utils.image_dataset_from_directory(
directory = test_dir,
label_mode = None,
batch_size = None,
image_size = (3000, 3000),
shuffle = False,
color_mode = "rgb")
# obtain the filenames
train_fnames = map(lambda fp: os.path.splitext(os.path.basename(fp))[0],
train.file_paths)
val_fnames = map(lambda fp: os.path.splitext(os.path.basename(fp))[0],
val.file_paths)
test_fnames = map(lambda fp: os.path.splitext(os.path.basename(fp))[0],
test.file_paths)
# read/compute the reponse
train_response = dict()
val_response = dict()
test_response = dict()
if(not config["force_load"] and os.path.exists(config["train_response_path"])
and os.path.exists(config["val_response_path"]) and os.path.exists(config["test_response_path"])):
train_response, val_response, test_response = readResponse(config)
else:
train_response, val_response, test_response = loadResponse(config)
# create list of responses in the same order as the image dataset
train_labels = [[train_response[fname]] for fname in train_fnames]
val_labels = [[val_response[fname]] for fname in val_fnames]
test_labels = [[test_response[fname]] for fname in test_fnames]
# add the responseS to the dataset
train = tf.data.Dataset.zip((train, tf.data.Dataset.from_tensor_slices(tf.constant(train_labels))))
val = tf.data.Dataset.zip((val, tf.data.Dataset.from_tensor_slices(tf.constant(val_labels))))
test = tf.data.Dataset.zip((test, tf.data.Dataset.from_tensor_slices(tf.constant(test_labels))))
print(f'Found {train.cardinality().numpy()} train instances, {val.cardinality().numpy()} '
+ f'validation instances, and {test.cardinality().numpy()} test instances.')
return train, val, test
As a result, we obtain separate datasets for training, validation, and testing, each containing the respective images associated to the corresponding binary label.
# obtain the dataset (either load or compute the response labels)
train, val, test = getDataset(config)
Enjoy Reading This Article?
Here are some more articles you might like to read next: