Training your model
In the previous post we exported the data to the Google Cloud Storage. In this post will read the data and use the data to run a model. There are many examples using colab. Here we focus on training your model on your pc or a server.
download the data from the gcp bucket.
Download the data from the gcp bucket.
use gsutil to copy the files to your folder of choice on your pc
gsutil -m cp -r gs://wfpbucket/l8Records myoutput/data/folder
install the tensorflow python package. Note that you can install tensorflow-gpu if your system has a powerfull GPU. You will also need to install the NVIDIA drivers if you would like to take advantage of the GPU.
pip install tensorflow
copy and paste the model below and set the correct paths.
# Tensorflow setup.
import tensorflow as tf
print(tf.__version__)
from tensorflow.python.keras import layers
from tensorflow.python.keras import losses
from tensorflow.python.keras import models
from tensorflow.python.keras import metrics
from tensorflow.python.keras import optimizers
def parse_tfrecord(example_proto):
"""The parsing function.
Read a serialized example into the structure defined by FEATURES_DICT.
Args:
example_proto: a serialized Example.
Returns:
A dictionary of tensors, keyed by feature name.
"""
return tf.io.parse_single_example(example_proto, FEATURES_DICT)
def to_tuple(inputs):
"""Function to convert a dictionary of tensors to a tuple of (inputs, outputs).
Turn the tensors returned by parse_tfrecord into a stack in HWC shape.
Args:
inputs: A dictionary of tensors, keyed by feature name.
Returns:
A tuple of (inputs, outputs).
"""
inputsList = [inputs.get(key) for key in FEATURES]
stacked = tf.stack(inputsList, axis=0)
# Convert from CHW to HWC
stacked = tf.transpose(stacked, [1, 2, 0])
return stacked[:,:,:len(BANDS)], stacked[:,:,len(BANDS):]
def get_dataset(pattern):
"""Function to read, parse and format to tuple a set of input tfrecord files.
Get all the files matching the pattern, parse and convert to tuple.
Args:
pattern: A file pattern to match in a Cloud Storage bucket.
Returns:
A tf.data.Dataset
"""
glob = tf.io.gfile.glob(pattern)
dataset = tf.data.TFRecordDataset(glob, compression_type='GZIP')
dataset = dataset.map(parse_tfrecord, num_parallel_calls=5)
dataset = dataset.map(to_tuple, num_parallel_calls=5)
return dataset
def get_training_dataset(glob,eval=True):
"""Get the preprocessed training dataset
Returns:
A tf.data.Dataset of training data.
"""
dataset = get_dataset(glob)
if eval:
dataset = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE).repeat()
else:
dataset = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
return dataset
def conv_block(input_tensor, num_filters):
encoder = layers.Conv2D(num_filters, (3, 3), padding='same')(input_tensor)
encoder = layers.BatchNormalization()(encoder)
encoder = layers.Activation('relu')(encoder)
encoder = layers.Conv2D(num_filters, (3, 3), padding='same')(encoder)
encoder = layers.BatchNormalization()(encoder)
encoder = layers.Activation('relu')(encoder)
return encoder
def encoder_block(input_tensor, num_filters):
encoder = conv_block(input_tensor, num_filters)
encoder_pool = layers.MaxPooling2D((2, 2), strides=(2, 2))(encoder)
return encoder_pool, encoder
def decoder_block(input_tensor, concat_tensor, num_filters):
decoder = layers.Conv2DTranspose(num_filters, (2, 2), strides=(2, 2), padding='same')(input_tensor)
decoder = layers.concatenate([concat_tensor, decoder], axis=-1)
decoder = layers.BatchNormalization()(decoder)
decoder = layers.Activation('relu')(decoder)
decoder = layers.Conv2D(num_filters, (3, 3), padding='same')(decoder)
decoder = layers.BatchNormalization()(decoder)
decoder = layers.Activation('relu')(decoder)
decoder = layers.Conv2D(num_filters, (3, 3), padding='same')(decoder)
decoder = layers.BatchNormalization()(decoder)
decoder = layers.Activation('relu')(decoder)
return decoder
def get_model():
n = 4
inputs = layers.Input(shape=[None, None, len(BANDS)])
encoder0_pool, encoder0 = encoder_block(inputs, n)
encoder1_pool, encoder1 = encoder_block(encoder0_pool, n*2)
encoder2_pool, encoder2 = encoder_block(encoder1_pool, n*4)
encoder3_pool, encoder3 = encoder_block(encoder2_pool, n*8)
encoder4_pool, encoder4 = encoder_block(encoder3_pool, n*16)
center = conv_block(encoder4_pool, n*32) # center
decoder4 = decoder_block(center, encoder4, n*16)
decoder3 = decoder_block(decoder4, encoder3, n*8)
decoder2 = decoder_block(decoder3, encoder2, n*4)
decoder1 = decoder_block(decoder2, encoder1, n*2)
decoder0 = decoder_block(decoder1, encoder0, n)
outputs = layers.Conv2D(5, (1, 1), activation='softmax')(decoder0)
model = models.Model(inputs=[inputs], outputs=[outputs])
print(model.summary())
model.compile(
optimizer = 'ADAM',
loss = losses.categorical_crossentropy,
metrics = [metrics.categorical_accuracy])
return model
# Specify inputs (Landsat bands) to the model and the response variable.
opticalBands = ['B2', 'B3', 'B4', 'B5', 'B6', 'B7']
BANDS = opticalBands
RESPONSE = ['cloud','shadow','snow','water','land']
FEATURES = BANDS + RESPONSE
# Specify the size and shape of patches expected by the model.
KERNEL_SIZE = 256
KERNEL_SHAPE = [KERNEL_SIZE, KERNEL_SIZE]
COLUMNS = [
tf.io.FixedLenFeature(shape=KERNEL_SHAPE, dtype=tf.float32) for k in FEATURES
]
FEATURES_DICT = dict(zip(FEATURES, COLUMNS))
# Specify model training parameters.
TRAIN_SIZE = 6000 # train size is 80*100*0.7
BATCH_SIZE = 16
EPOCHS = 16
BUFFER_SIZE = 4000
# location of the gcp bucket
data_path = "/path/to/data/"
# import the training, testing and validation records
training_files = data_path + '/training/train*'
testing_files = data_path + '/testing/test*'
validation_files = data_path + '/validation/val*'
training_ds = get_training_dataset(training_files)
testing_ds = get_training_dataset(testing_files)
validation_ds = get_training_dataset(validation_files,False)
model = get_model()
model.fit(
x = training_ds,
epochs = EPOCHS,
steps_per_epoch =int(TRAIN_SIZE / BATCH_SIZE),
validation_data = testing_ds,
validation_steps = 100)
print(model.evaluate(x=validation_ds))
# save model
MODEL_DIR = '/path/to/save/qamodel/'
tf.saved_model.save(model, MODEL_DIR)
After the model finished we need to make it readable for the earth engine
from tensorflow.python.tools import saved_model_utils
MODEL_DIR = r'/path/to/qamodel'
label = "landclass"
meta_graph_def = saved_model_utils.get_meta_graph_def(MODEL_DIR, 'serve')
inputs = meta_graph_def.signature_def['serving_default'].inputs
outputs = meta_graph_def.signature_def['serving_default'].outputs
# Just get the first thing(s) from the serving signature def. i.e. this
# model only has a single input and a single output.
input_name = None
for k,v in inputs.items():
input_name = v.name
break
output_name = None
for k,v in outputs.items():
output_name = v.name
break
# Make a dictionary that maps Earth Engine outputs and inputs to
# AI Platform inputs and outputs, respectively.
import json
input_dict = "'" + json.dumps({input_name: "array"}) + "'"
output_dict = "'" + json.dumps({output_name: label}) + "'"
print(input_dict)
print(output_dict)
We use the information from the code above to run the lines below.
earthengine model prepare --source_dir /paht/to/qamodel/ --dest /path/to/qaEefied --input "{\"serving_default_input_1:0\":\"array\"}" --output "{\"StatefulPartitionedCall:0\":\"landclass\"}"
Now we upload the model to the Google Cloud Platform and run the code below to push the model to the ai platform. You may need to create your project and model on the cloud platform.
gcloud ai-platform versions create mymodelversion --project projectName --model modelName --origin gs://mybucket/qaEefied --runtime-version=2.3 --framework "TENSORFLOW" --python-version=3.7
One comment