tensorflow 六种方法构建读入batch样本(含序列特征处理),踩坑经验值得收藏
书接上文,对 图机器学习算法 感兴趣 的 同学 可以去 图算法十篇 之 图机器学习系列文章总结 这里查看,对 推荐广告算法 感兴趣 的同学 可以去这里 系列小作文之企业级机器学习pipline总结 查看,干货多多 哦! 而对 使用 tensorflow 实现 复杂机器学习/深度学习 模型 感兴趣 的同学, 欢迎关注 算法全栈之路 的 公众号 接下来 逐步更新 的 模型手把手系列 的文章~
本篇是 模型手把手系列 的第二篇文章,本系列的上一篇文章 模型手把手系列开篇 之 python、spark 和 java 生成TFrecord 中 我们 主要说明 了 如何用 多种方式 生成 tensorflow 官方推荐 的 数据格式 tfrecord 的 方法,而 本章 我们则 将继续 看看 tensorflow 如何 读取 各种类型的特征,特别是序列特征数据 ,并使用 多种方法生成 batch 训练样本,代码 涵盖 tensorflow 1.x 系列 和 tensorflow 2.x 系列 的 方法,走过 路过 不能错过 哦 !
作者 花了 大量时间 来 整理 本文章里 介绍的 多种方法 的源码,是因为 在 当初写 图算法相关的文章之 图上 deepwalk 算法理论与实战,图算法之瑞士军刀篇(一) 以及 图上 deepwalk 算法理论 与 tensorflow keras 实战,图算法之瑞士军刀篇(二) 这 两篇 文章的时候, 小小的 batch 数据生成,坑死了我这个混迹于国内外互联网大厂多年的算法老同志~ ,有些问题 没遇到不算事,遇到了 找bub 真是要了我的小命了。 闲言少叙 ,就看文章干不干,转需吧 ~
本文主要讲解了 6 种 用tensorflow 1.x / 2.x 如何读取 训练数据,特别是 序列特征数据 的 处理方法。因为 这些方法 有着 各自的应用场景 和 各自使用特点,算是 对 上次遇坑 的报复性解决心理 吧,这里 我 全部 把开发列举 出来了,希望 可以 切实的帮助到 同样 遇到问题的老哥 。
老话说得好: 代码是表达程序员思想的最好语言。本文的 数据读入代码 ,刻意 剖析了 使用 tensorflow 多种方法读入 用户历史行为序列特征 的过程 ,代码 每个 单元cell 均可以 独立完美 运行成功,具有极高的 参考价值 哦。详细内容 直接 看代码 吧!~~
(1)代码时光
本文共介绍了 6种 tensorflow 读取数据 并 batch 训练 的方法,包括使用 slice_input_producer、from_tensor_slices、generate、interleave
以及 自定义 生成batch 数据 等方法,下面就让我们 一种一种方法 的 介绍吧,总有一种适合你的。
(1.0) 数据准备
本文 用到的 数据,从 内存中 读取csv的,我们在这里直接列出; 而使用到 tfrecord 的,我们则使用的上文 模型手把手系列开篇 之 python、spark 和 java 生成TFrecord 中python方法单机版生成的tfrecord 数据。
@ 欢迎关注作者公众号 算法全栈之路
import pandas as pd
raw_df = pd.DataFrame([[28,12.1,'male',[1,2],1], [30,8.7, 'female',[3,4,5],0], [32,24.6,'female',[6,7,8,9,10],1]], columns=['age', 'price','sex','click_list','label'])
# 序列特征长度不够填充,使用 tf.train.batch 生成 batch 必须要定长序列
max_len=5
padding_value=0
raw_df['click_list'] = raw_df['click_list'].apply(lambda x: x + [padding_value]*(max_len- len(x)))
raw_df['click_list_str'] = raw_df['click_list'].apply(lambda x: '#'.join(map(str, x)))
# 普通特征处理
raw_df['age'] = raw_df['age'].astype(str)
raw_df['sex'] = raw_df['sex'].astype(str)
raw_df['label'] = raw_df['label'].astype(str)
print(raw_df)
raw_df.to_csv("read_sample.csv",sep='t',index=False)
代码 很 简单,我就 不赘述 了。
中间要 注意的是:click_list 这一列特征就是 序列特征 ,每个用户 的 历史行为序列 的长度 并非定长 ,但是在某些方法里, 生成batch特征的时候,要求list 类型的数据是定长的 ,所以 我这里 用 默认值 0 进行了 padding 填充 。
(1.1)tensorflow 1.x 使用 slice_input_producer 生成 batch 数据
看代码吧。
@ 欢迎关注作者公众号 算法全栈之路
import tensorflow.compat.v1 as tf
tf.compat.v1.disable_eager_execution()
# 创建输入数据队列
input_queue = tf.train.slice_input_producer(
[raw_df['age'].to_list(), raw_df['price'].to_list(),raw_df['sex'].to_list(), raw_df['click_list'].to_list(),raw_df['label'].to_list()],
shuffle=True
)
# 读取队列中的数据
all_sample_count = len(raw_df)
batch_size = 2
num_threads = 1
min_after_dequeue = 1
all_feature_batch = tf.train.batch(
input_queue,
batch_size=batch_size,
num_threads=num_threads,
capacity=min_after_dequeue + (num_threads + 1) * batch_size
)
# 打印输出结果
with tf.Session() as sess:
# 初始化变量
sess.run(tf.global_variables_initializer())
# 启动队列操作
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)
for i in range(all_sample_count//batch_size):
age_batch, price_batch, sex_batch,click_list_batch,label_batch = sess.run(all_feature_batch)
print(f"age_batch: {age_batch}n price_batch: {price_batch}n sex_batch: {sex_batch} n click_list_batch: {click_list_batch} n label_batch: {label_batch} ")
coord.request_stop()
coord.join(threads)
这里这里的 tf 是 import tensorflow.compat.v1 as tf
,适配于 tensorflow 1.x 系列 的模型。
这里主要用了 tf.train.slice_input_producer 和 tf.train.batch
数据来 生成batch 数据。
还是 重点说下序列特征 列 click_list_batch ,这里 读入的 是一个 历史点击行为序列ID list,是 定长的 int 型 。定长 那就好办了呀 ,直接 接embeding matrix 拿到每个 id 对应 的 embeding 然后扔进模型里去。
这个 cell 里 的 代码是可以 跑通的,如果确实帮助到你了,欢迎 关注作者的公众号 凑个份子~
(1.2)tensorflow 2.0 直接使用 from_tensor_slices 生成 batch 数据
@ 欢迎关注作者公众号 算法全栈之路
import tensorflow as tf
tf.config.run_functions_eagerly(True)
print("eager_status:",tf.executing_eagerly())
import pandas as pd
batch_size = 3
max_len=5
raw_df['click_list'] = raw_df['click_list'].apply(lambda x: '#'.join(map(str, x)))
raw_df['age'] = raw_df['age'].astype(str)
raw_df['price'] = raw_df['price'].astype(str)
dataset = tf.data.Dataset.from_tensor_slices((raw_df[['age', 'price', 'sex', 'click_list']].values, raw_df['label'].values))
dataset = dataset.shuffle(buffer_size=len(raw_df)).batch(batch_size)
# Iterate over the batches
for batch in dataset:
features, labels = batch
# 定位到 序列特征所在位置
str_list_batch = features[:,3:4]
list_feature=tf.strings.split(str_list_batch,"#")
# 输出是一个SparseTensorValue对象
# https://blog.csdn.net/ustbbsy/article/details/116644136
print("ccccc:",list_feature.values)
print(list_feature.shape)
print('Features:', features)
# 另一种定位序列特征的方式
print('Features(1):', features[1][3])
print('Labels:', labels)
print()
注意: 因为 我本机mac 的 tensorflow 版本是 2.6.0 的版本,所以这里tf默认就是2.6.0了。
我们 可以使用
tf.config.run_functions_eagerly(True)
print("eager_status:",tf.executing_eagerly())
来确认 是否启动了 tensorflow 2.x系列的 eager 模式 。
这里还是 重点说一些 序列特征吧,这里读入的是 把序列特征拼接成一个字符串 ,然后在对 每个batch里 进行 字符串的分割,我们 这里用到的 方法 是 :
str_list_batch = features[:,3:4]
list_feature=tf.strings.split(str_list_batch,"#")
注意 tf.strings.split
的 返回是一个 SparseTensorValue
对象, .values
属性可以拿到具体的值。
因为是 把序列特征拼接成了字符串,所以 我们这里 不要求序列长度是定长 的,非定长的序列特征处理 得到 SparseTensorValue
之后,我们可以使用 tf.Variable 或 tf.keras.layers.Embedding
来创建该嵌入矩阵。 最后,我们可以使用 tf.nn.embedding_lookup_sparse()
函数 来获取 嵌入向量。
最后在强调一点 就是:对于支持 eager模式的 dataset, 我们可以直接用for循环以及dict 来获取对应特征的取值 哦,非常方便,非常强大 ,使用前 注意确认 eager模式 是否开启。
(1.3)使用 dataset 的 generate 生成 batch 数据
对于 数据量不太大 的训练数据,很多同学 还是 习惯 使用 python 的 yeild 来 构建generator , 所以 我们 也提供 了 基于 generator 来 生成 batch 样本 的方法,看代码吧 ~
@ 欢迎关注作者公众号 算法全栈之路
import tensorflow as tf
import pandas as pd
import numpy as np
# 创建一个虚拟的 pandas dataframe
df = pd.DataFrame({
'float_col': np.random.rand(3),
'int_col': np.random.randint(0, 10, size=(3)),
'str_col': ['string{}'.format(i) for i in range(3)],
'list_col': [[i, i+1] for i in range(3)]
})
print(df)
# 创建一个生成器函数,用于将 pandas dataframe 转换为 Tensorflow 数据集
def generator():
for index, row in df.iterrows():
yield (
{
'float_input': row['float_col'],
'int_input': row['int_col'],
'str_input': row['str_col'],
'list_input': row['list_col']
},
row['int_col'] # 将 int_col 作为标签
)
# 创建 Tensorflow 数据集
dataset = tf.data.Dataset.from_generator(generator,
output_signature=(
{
'float_input': tf.TensorSpec(shape=(), dtype=tf.float32),
'int_input': tf.TensorSpec(shape=(), dtype=tf.int32),
'str_input': tf.TensorSpec(shape=(), dtype=tf.string),
'list_input': tf.TensorSpec(shape=(2,), dtype=tf.int32)
},
tf.TensorSpec(shape=(), dtype=tf.int32)
))
# 对数据进行批次处理
batch_size = 8
dataset = dataset.batch(batch_size)
# 打印数据集中的第一个批次
for feature_batch, label_batch in dataset:
print('float_input:', feature_batch['float_input'])
print('int_input:', feature_batch['int_input'])
print('str_input:', feature_batch['str_input'])
print('list_input:', feature_batch['list_input'])
print('label:', label_batch)
这里的 重点 依然是 序列特征的处理 ,对于 定长 以及 非定长 的 序列特征,本文前面均 进行了 说明,这里 我就不在 强调 了,往上翻去 找找 就可以 看到哦。
(1.4)使用dataset 的 interleave 接口去读取 txt 样本文本文件
接下来 要介绍 的 两种方法,才是 我们在 工业上 大数据场景下 实际使用的 非常多的 特征数据 读入方法,看代码吧~
@ 欢迎关注作者公众号 算法全栈之路
import tensorflow as tf
print("eager_status:",tf.executing_eagerly())
tf.config.run_functions_eagerly(True)
# 训练集所有的列
TRAIN_SET_ALL_COLUMNS=["age", "price", "sex", "click_list", "label", "click_list_str"]
# 没有用到的列,这里把去掉
TRAIN_SET_USELESS_COLUMN_NAMES=['click_list']
# 并行度
NUM_PARALLEL_FOR_DATASET=1
BATCH_SIZE=2
def parse_txt_line(line, label_dtype):
if label_dtype == tf.dtypes.float32:
label_default_value = 0.0
else:
label_default_value = 0
# int64类型的默认值,用long(0)也不好使,要设置一个真正大于int32的数值
# 默认值个数必须和读入个数一致,很重要 ValueError: not enough values to unpack (expected 12, got 4)
# 整数默认是 [1 << 32]
# 默认值很重要,格式不对会导致这个问题
# ValueError: Column dtype and SparseTensors dtype must be compatible. key: adid, column dtype:
# <dtype: 'string'>, tensor dtype: <dtype: 'int64'>
field_defaults = [ [""], [""], [""], [""],[label_default_value],[""]]
# 从csv格式中解析出这些字段
age, price, sex, click_list, label, click_list_str = tf.io.decode_csv(line, field_defaults, field_delim="t")
# 对一些字段使用 tf.cast 进行类型转换,这里完全不需要,下游有进行hash
# adid = tf.cast(adid, tf.dtypes.int32)
# | 号分隔, tf.strings.to_number 把字符串转化为默认浮点数
# user_click_seq = tf.strings.to_number(tf.strings.split(user_click_seq, sep="|"))
label = tf.cast(label, tf.int64)
fields_values = [age, price, sex, click_list, label, click_list_str]
features = dict(zip(TRAIN_SET_ALL_COLUMNS, fields_values))
# 没有用到de列,需要pop出去
for useless_column_name in TRAIN_SET_USELESS_COLUMN_NAMES:
features.pop(useless_column_name)
label = features.pop("label")
# 返回一个dict{feature_name,value} 和 label
return features, label
def get_text_dataset(data_set_path_list, label_dtype):
filenames_dataset = tf.data.Dataset.from_tensor_slices(data_set_path_list)
raw_dataset = filenames_dataset.interleave(
# 2个线程并行去读 TextLineDataset
lambda x: tf.data.TextLineDataset(x, num_parallel_reads=NUM_PARALLEL_FOR_DATASET),
# NUM_PARALLEL_FOR_DATASET=2
cycle_length=NUM_PARALLEL_FOR_DATASET,
block_length=BATCH_SIZE,
num_parallel_calls=NUM_PARALLEL_FOR_DATASET
)
raw_dataset = raw_dataset.
map(lambda line: parse_txt_line(line, label_dtype), num_parallel_calls=NUM_PARALLEL_FOR_DATASET).
apply(tf.data.experimental.ignore_errors())
# 格式 dict(fea_name,value) , label
return raw_dataset
train_set_path_list=["read_sample.csv"]
train_raw_dataset = get_text_dataset(train_set_path_list, label_dtype=tf.dtypes.int64)
for feature_batch, label_batch in train_raw_dataset:
print(feature_batch['age'])
print(label_batch)
这里的 代码 是 工业大数据场景下 常用的方法,我们使用 tf.data.Dataset.from_tensor_slices
接口,一般会 先使用 tf.io.gfile
相关的接口 读取到 hdfs 大数据集群上的 文件路径 ,然后 tf.data.TextLineDataset
去 并行读取,这里 的 方法 主要调用了 parse_txt_line
这个方法 来解析单行 的 样本文件。
这里的 序列特征,我们可以在 parse_txt_line
用 python方法 把处理成 list 数据,但是 要求定长,具体方法 看本文 开始的时候 的处理方法。当然,也可以在 获得 batch 得时候 用 tf.strings.split
进行处理,和上面开篇 第二种 方法一样。
更近一步,甚至 我们可以 将 序列特征字符串 一直 放到 模型里 去处理 都是可以的。
(1.5) 使用dataset 的 interleave 接口去读取 tfrecord 文件
这个方法是 企业级机器学习pipline 处理大数据量下 模型训练 用到最多 的方法,甚至 tfrecord 能够 兼容语音图像 等格式,这一块 感兴趣 的 同学自己下去 查看资料 吧,我们 这里 主要介绍的 都是 数值以及字符串列表 等 搜广推算法 更多用到 的 特征数据。
@ 欢迎关注作者公众号 算法全栈之路
import tensorflow as tf
print("eager_status:",tf.executing_eagerly())
tf.config.run_functions_eagerly(True)
# 并行度
NUM_PARALLEL_FOR_DATASET=1
BATCH_SIZE=2
def get_tf_record_dataset(data_set_path_list,shuffle=True):
files = tf.data.Dataset.list_files(data_set_path_list, shuffle=shuffle)
dataset = files.apply(
tf.data.experimental.parallel_interleave(
lambda x: tf.data.TFRecordDataset(x, num_parallel_reads=NUM_PARALLEL_FOR_DATASET),
cycle_length=NUM_PARALLEL_FOR_DATASET,
block_length=BATCH_SIZE,
sloppy=False
)
)
# parsing_spec 是一个字典, 它提供了每个特征到 "FixedLenFeature" 或 "VarLenFeature" 的映射
parsing_spec = {
'age': tf.io.FixedLenFeature([1], tf.int64),
'price': tf.io.FixedLenFeature([1],tf.float32),
'gender': tf.io.FixedLenFeature([1], tf.string),
'click_list': tf.io.VarLenFeature(tf.int64),
'label': tf.io.FixedLenFeature([1],tf.int64)
}
def read_batch(serialized):
feature = tf.io.parse_example(serialized, features=parsing_spec)
label = feature['label']
return feature, {"label": label}
raw_tfrecord_data = dataset.map(read_batch, NUM_PARALLEL_FOR_DATASET)
# 格式 dict(fea_name,value), label
return raw_tfrecord_data
train_set_path_list=["py_tf_record"]
train_raw_dataset = get_tf_record_dataset(train_set_path_list)
for feature_batch, label_batch in train_raw_dataset:
print("age:",feature_batch['age'])
# 这里的 click_list 返回的是一个 SparseTensor, 用 .values 方法可以得到值。
print("click_list:",feature_batch['click_list'].values)
print('label:',label_batch)
特别推荐 这里介绍 的 处理数据 的方法,将训练数据 保存为 tfrecord 格式,不仅 速度快 而且 节省存储 空间,对 生成 tfrecord 数据 不熟悉的 同学,可以 去看 作者的 上一篇 文章 模型手把手系列开篇 之 python、spark 和 java 生成TFrecord 。
这里 重点要强调 的是 parsing_spec
和 read_batch
方法,parsing_spec 中 定义来 定长和变长 tfrecord 数据 的 解析方法,非常优秀,读出来得 序列特征 是 变长的 SparseTensor
, 后面 处理得到 embeding
的方法,可与参考 上面文章 介绍的 SparseTensor 得到 embeding 得 部分内容 哦,这里我 就也 不再赘述 了。
本文 到这里 ,我们 共介绍了 5 种 tensorflow 读取数据 的方法,后两种 为 工业大数据模型 训练场景下 的 算法利器,强烈推荐。
加上 图上 deepwalk 算法理论与实战,图算法之瑞士军刀篇(一) 文章里使用的 自定义生成 batch 数据 的方法,共有 6种方法 来 适配不同的 业务数据 读取场景了,可以 算是 集 tensorflow 读取数据 的 大成之作了,每一个 小节 的 代码 均可以 独立运行成功,非常 值得收藏!
到这里, 模型手把手系列开篇 之 tensorflow 六种方法读入batch样本(含序列特征处理), 踩坑经验值得收藏 的 全文 就 写完 了。本文代码 每个模块 均可以 独立跑 成功,总有一款 适合你,希望 可以对你 有 参考作用 ~
码字不易,觉得有收获就动动小手转载一下吧,你的支持是我写下去的最大动力 ~
更多更全更新内容,欢迎关注作者的公众号: 算法全栈之路
- END –