Классификация жестов
Обучение модели ИНС для классификации жестов с использованием радара
В этом примере рассматривается задача классификации жестов руки с использованием сигналов радара. Такая технология может применяться в бесконтактных интерфейсах управления, например, в умных домах, автомобилях или медицинских устройствах. В качестве источника данных используется микродоплеровская информация, полученная от радара, а для классификации применяется глубокая нейронная сеть.
Подготовка к работе
Для начала установим и импортируем все необходимые пакеты
include("$(@__DIR__)/installpackages.jl")
using Pkg
Pkg.instantiate()
using CSV
using DataFrames
using MAT
using Flux
using Random
using Plots
using MLUtils
using Flux
using CUDA
using cuDNN
using OneHotArrays
using Statistics
using StatisticalMeasures
Набор данных
UWB‑Gestures — это открытый датасет динамичных жестов руки. В нём 12 различных движений — махи, вращения, диагональные движения и жест «пусто» — всего 9600 примеров. Для каждого жеста данные записаны одновременно тремя обособленными радарами, расположенными слева, сверху и справа.
Каждый элемент датасета состоит из трёх тензоров, соответствующих измерениям с трёх различных ракурсов (левый, верхний, правый). Эти данные представлены как окна фиксированной длины с определённым числом признаков в каждом кадре. У каждого семпла есть две метки: числовой идентификатор жеста и его текстовое описание.
Набор данных находится в архив файле в рабочей директории. Нужно 2 раза кликнуть ЛКМ по архиву для его распаковки
struct GestureDataset
segments::Vector{Tuple{Array{Float32,3},Array{Float32,3},Array{Float32,3},Int,String}}
end
function GestureDataset(groups, gesture_dict)
segs = Tuple{Array{Float32,3},Array{Float32,3},Array{Float32,3},Int,String}[]
for key in sort(collect(keys(groups)))
paths = sort(groups[key])
mats = map(p->Matrix{Float32}(CSV.read(p, DataFrame; delim=',')), paths)
L, T, R = mats
gnum = parse(Int, match(r"_G(\d+)", key).captures[1])
gname = gesture_dict["G$(gnum)"]
nseg = div(size(L,1), slowTimeFrames)
for j in 1:nseg
rows = (j-1)*slowTimeFrames+1 : j*slowTimeFrames
seqL = reshape(L[rows, :], slowTimeFrames, size(L,2), 1)
seqT = reshape(T[rows, :], slowTimeFrames, size(T,2), 1)
seqR = reshape(R[rows, :], slowTimeFrames, size(R,2), 1)
push!(segs, (seqL, seqT, seqR, gnum, gname))
end
end
GestureDataset(segs)
end
Base.length(d::GestureDataset) = length(d.segments)
function Base.getindex(d::GestureDataset, i::Int)
seqL, seqT, seqR, gnum, gname = d.segments[i]
return seqL, seqT, seqR, gnum, gname, gname
end
Определим функции min-max нормализации, поскольку для более качественной сходимости данные перед подачей в модель рекомендуется нормировать
normalize_rows(mat::AbstractMatrix) = (mat .- minimum(mat, dims=2)) ./ (maximum(mat, dims=2) .- minimum(mat, dims=2));
Строки и столбцы в каждой матрице радарного сигнала представляют соответственно длительность жеста руки и расстояние руки от радара. Во время сбора данных испытуемый повторял определённый жест руки в течение 450 секунд, что соответствует 9000 строкам по медленному времени. Один полный жест занимает 90 отсчётов по медленному времени. Таким образом, каждая матрица радарного сигнала содержит 100 полных примеров движения руки. Диапазон действия каждого UWB-радара составляет 1,2 метра, что соответствует 189 отсчётам по быстрому времени.
slowTimeFrames = 90;
recordedTimePerSample = 4.5;
radarRange = 1.2;
Далее код группирует пути к CSV-файлам с радарными данными по жестам.
base_dir = "$(@__DIR__)/data/data"
hv_dirs = filter(d -> occursin(r"^HV_\d+$", d), readdir(base_dir))
groups = Dict{String, Vector{String}}()
for hv in hv_dirs
clean_hv = replace(hv, "_" => "")
subdir = joinpath(base_dir, hv, "ClutterRemovedData_$clean_hv")
for f in readdir(subdir; join=true)
filename = splitpath(f)[end]
m = match(r"^(HV\d+_G\d+)_Radar", filename)
key = m.captures[1]
push!(get!(groups, key, String[]), f)
end
end
Определим код и названия для каждого жеста, а также создадим сам датасет
gesture_codes = ["G1","G2","G3","G4","G5","G6","G7","G8","G9","G10","G11","G12"]
gesture_names = [
"движение слева направо",
"движение справа налево",
"движение сверху вниз",
"движение снизу вверх",
"диагональное движение слева направо сверху вниз",
"диагональное движение слева направо снизу вверх",
"диагональное движение справа налево сверху вниз",
"диагональное движение справа налево снизу вверх",
"по часовой стрелке",
"против часовой стрелки",
"толчок внутрь",
"нет жеста"
]
gesture_dict = Dict(code => name for (code, name) in zip(gesture_codes, gesture_names))
ds = GestureDataset(groups, gesture_dict)
println(length(ds))
Далее необходимо сформировать тренировочный, валидационный и тестовый загрузчики данных
obs = collect(1:length(ds))
train_obs, val_obs, test_obs = splitobs(obs, at=(0.7, 0.15))
Xl_train = [ ds[i][1] for i in train_obs ]
Xt_train = [ ds[i][2] for i in train_obs ]
Xr_train = [ ds[i][3] for i in train_obs ]
Y_train = [ ds[i][4] for i in train_obs ]
Xl_val = [ ds[i][1] for i in val_obs ]
Xt_val = [ ds[i][2] for i in val_obs ]
Xr_val = [ ds[i][3] for i in val_obs ]
Y_val = [ ds[i][4] for i in val_obs ]
Xl_test = [ ds[i][1] for i in test_obs ]
Xt_test = [ ds[i][2] for i in test_obs ]
Xr_test = [ ds[i][3] for i in test_obs ]
Y_test = [ ds[i][4] for i in test_obs ]
batchsize = 64
train_data = collect(zip(Xl_train, Xt_train, Xr_train, Y_train))
train_loader = DataLoader(train_data; batchsize=batchsize, shuffle=true)
val_data = collect(zip(Xl_train, Xt_train, Xr_train, Y_train))
val_loader = DataLoader(val_data; batchsize=batchsize, shuffle=true)
test_data = collect(zip(Xl_train, Xt_train, Xr_train, Y_train))
test_loader = DataLoader(test_data; batchsize=batchsize, shuffle=true);
Архитектура ИНС
MisoCNN состоит из трёх параллельных веток, каждая из которых принимает одно-канальный входной тензор размером 90×189 и пропускает его через четыре последовательных блока Conv3×3→ReLU→BatchNorm. Затем выходы трёх веток складываются по новому измерению.
function makeBranch()
Chain(
Conv((3,3), 1=>8, pad=1), BatchNorm(8), relu, x->maxpool(x, (2,2)),
Conv((3,3), 8=>16, pad=1), BatchNorm(16), relu, x->maxpool(x, (2,2)),
Conv((3,3),16=>32, pad=1), BatchNorm(32), relu, x->maxpool(x, (2,2)),
Conv((3,3),32=>64, pad=1), BatchNorm(64), relu, x->maxpool(x, (2,2)),
)
end
H, W = 90, 189
outH, outW = H÷16, W÷16
feat = outH*outW*64
head = Chain(
x -> sum(x, dims=5),
x -> reshape(x, feat, :),
Dense(feat, 12)
)
struct MisoCNN
b::Vector{Chain}
h::Chain
end
function MisoCNN()
branches = [makeBranch() for _ in 1:3]
MisoCNN(branches, head)
end
function (m::MisoCNN)(x1,x2,x3)
y1 = m.b[1](x1)
y2 = m.b[2](x2)
y3 = m.b[3](x3)
stacked = cat(y1, y2, y3; dims=5)
m.h(stacked)
end
Flux.@functor MisoCNN
Инициализируем модель и переведем ее сразу на используемый девайс
model = MisoCNN() |> gpu;
Обучение модели
Зададим скорость обучения, функцию потерь и оптимизатор
lr = 0.0001
lossFunction(x, y) = Flux.Losses.logitcrossentropy(x, y);
opt = Flux.Adam(lr, (0.9, 0.99));
Classes = 1:length(gesture_codes);
Определим функцию тренировки и валидации модели
function train!(
model, train_loader, opt, loss_fn,
device, epoch::Int, num_epochs::Int
)
running_loss = 0.0
n_batches = 0
for (i, data) in enumerate(train_loader)
xs1 = cat((b[1] for b in data)...; dims=4) |> gpu
xs2 = cat((b[2] for b in data)...; dims=4) |> gpu
xs3 = cat((b[3] for b in data)...; dims=4) |> gpu
ys = onehotbatch([b[4] for b in data], Classes) |> gpu
loss_val, gs = Flux.withgradient(Flux.params(model)) do
ŷ = model(xs1, xs2, xs3)
loss_fn(ŷ, ys)
end
Flux.update!(opt, Flux.params(model), gs)
running_loss += Float64(loss_val)
n_batches += 1
end
train_loss = running_loss / max(n_batches, 1)
return opt, train_loss
end
function validate(model, val_loader, loss_fn, device)
Flux.testmode!(model)
running_loss = 0.0
preds = nothing
targets = nothing
n_batches = 0
for data in val_loader
xs1 = cat((b[1] for b in data)...; dims=4) |> gpu
xs2 = cat((b[2] for b in data)...; dims=4) |> gpu
xs3 = cat((b[3] for b in data)...; dims=4) |> gpu
ys = onehotbatch([b[4] for b in data], Classes) |> gpu
ŷ = model(xs1, xs2, xs3)
loss_val = loss_fn(ŷ, ys)
running_loss += Float64(loss_val)
n_batches += 1;
if preds === nothing
preds = ŷ
targets = ys
else
preds = hcat(preds, ŷ)
targets = hcat(targets, ys)
end
Flux.trainmode!(model)
end
return running_loss / max(n_batches, 1)
end;
Запустим цикл тренировки модели
no_improve_epochs = 0
best_model = nothing
train_losses = [];
valid_losses = [];
best_val_loss = Inf;
num_epochs = 50
for epoch in 1:num_epochs
println("-"^50 * "\n")
println("EPOCH $(epoch):")
opt, train_loss = train!(
model, train_loader, opt,
lossFunction, device, 1, num_epochs
)
val_loss = validate(model, val_loader, lossFunction, gpu)
if val_loss < best_val_loss
best_val_loss = val_loss
best_model = deepcopy(model)
end
println("Epoch $epoch/$num_epochs | train $(round(train_loss, digits=4)) | val $(round(val_loss, digits=4))")
push!(train_losses, train_loss)
push!(valid_losses, val_loss)
end
Тестирование модели
Посчитаем точность обученной модели на тестовых данных, с которыми модель еще не сталкивалась для оценки ее эффективности
preds = []
target = []
for data in test_loader
xs1 = cat((b[1] for b in data)...; dims=4) |> gpu
xs2 = cat((b[2] for b in data)...; dims=4) |> gpu
xs3 = cat((b[3] for b in data)...; dims=4) |> gpu
ys = [b[4] for b in data]
ŷ = best_model(xs1, xs2, xs3)
ŷ = cpu(ŷ)
ys = cpu(ys)
ŷ = softmax(ŷ)
ŷ = onecold(ŷ)
push!(preds, ŷ)
push!(target, ys)
end
ŷ = vcat(preds...)
y = vcat(target...)
acc = mean(ŷ .== y)
println("Accuracy = $(round(acc*100, digits=2)) %")
Точность предсказания модели - 97%
Визуализируем графики потерь для тренировки и валидации
plot(train_losses, label="train loss", lw=3)
plot!(valid_losses, label="valid loss", lw = 3)
xlabel!("Эпоха")
ylabel!("Ошибка")
На графике функций потерь видно, что модель сходится без признаков переобучения
Определим функцию для построения матрицы ошибок для оценки модели
function plot_confusion_matrix(C, gesture_names)
heatmap(
C,
title = "Confusion Matrix",
xlabel = "Predicted",
ylabel = "True",
xticks = (1:length(gesture_names), gesture_names),
yticks = (1:length(gesture_names), gesture_names),
c = :viridis,
colorbar_title = "Count",
size = (600, 400)
)
for i in 1:size(C, 1)
for j in 1:size(C, 2)
annotate!(j, i, text(C[i, j], :white, 12, :bold))
end
end
display(current())
end;
Визуализируем матрицу ошибок
conf_matrix = StatisticalMeasures.ConfusionMatrices.confmat(ŷ, y)
conf_matrix = StatisticalMeasures.ConfusionMatrices.matrix(conf_matrix)
plot_confusion_matrix(conf_matrix, gesture_codes)
Как видно на тестовом множестве модель достаточно точно выполняет классификацию. Потеря точности происходит из-за схожести классов G11, G12
Выводы
В данном примере была обучена сверточная нейронная сеть для классификации жестов по данным, полученным с трех радаров. Точность модели - 97%.