Генерация спектрограмм с помощью DDPM
Введение
Генеративные модели активно развиваются и находят применение в различных областях, включая обработку сигналов и синтез данных. Одной из таких моделей является Diffusion Probabilistic Model (DDPM), которая демонстрирует высокое качество генерации за счет постепенного процесса добавления и удаления шума. В данном демонстрационном примере рассматривается применение DDPM для синтеза спектрограмм, полученных из исходных сигналов.
Такой подход позволяет создавать новые реалистичные спектрограммы, которые могут быть использованы для аугментации данных, синтеза сигналов или в исследовательских целях.
В данном демо примере исходными данными будут являться обработанные ЭКГ и радарные сигналы, которые переводятся в спектрограммы
Подготовка к работе
Сначала импортируем все необходимые пакеты
include("$(@__DIR__)/InstallPackages.jl")
using Pkg; Pkg.instantiate()
using MAT
using Plots
using DenoisingDiffusion
using FFTW
using Images
using ColorSchemes
using SignalAnalysis
using DSP
using Statistics
using Flux
using CUDA, cuDNN
using Printf
using JSON
using BSON
using Optimisers
Определим вспомогательную функцию z-score нормализации для приведения обучаемых данных к диапазону [0-1]
function normalize_zero_to_one(x)
x = (x .- mean(x)) ./ (std(x) + eps())
end
Поскольку исходные данные - сигнал ЭКГ, который собой представляет временной ряд размера порядка 1, то нецелесообразно его использовать как входные данные для ИНС. Вместо этого, будем сигнал будем нарезать на перекрывающиеся окна. использование такого подхода дает следующие преимущества
-
Вычислительная сложность – Обработка всего сигнала целиком требует значительных ресурсов памяти и вычислительной мощности, что затрудняет обучение даже на современных GPU/TPU.
-
Локальная структура сигнала – ЭКГ обладает характерными паттернами (например, комплексами QRS), которые имеют относительно небольшую длительность по сравнению с полной записью. Анализ глобального сигнала без учета локальных особенностей может привести к потере значимой информации.
-
Ограничения архитектуры нейросетей – Большинство современных архитектур (например, U-Net в DDPM) оптимизированы для работы с данными фиксированного и умеренного размера, такими как изображения или короткие сегменты сигналов.
Следующий блок кода реализует разбиение сигнала на окна, каждое окно сохраняется в папку output_dir
input_dir = "notprepareddata"
output_dir = "data"
window_size = 5184
hop = 1000
mkpath(output_dir)
files = filter(p -> occursin(r"^GDN\d{4}_.+\.mat$", basename(p)),
sort(readdir(input_dir; join=true)))
for file in files
data = matread(file)
radar = Float32.(vec(data["radar"]))
ecg = Float32.(vec(data["ecg"]))
N = min(length(radar), length(ecg))
base = basename(file)[1:end-4]
seg = 1
for start in 1:hop:(N - window_size + 1)
idx = start:start + window_size - 1
radar_seg = radar[idx]
ecg_seg = ecg[idx]
out_name = "$(base)_seg$(seg).mat"
matwrite(joinpath(output_dir, out_name),
Dict("radar" => radar_seg, "ecg" => ecg_seg))
seg += 1
end
end
Объявляется структура Dataset
, которая при инициализации просто запоминает список .mat-файлов в указанной папке и параметры STFT. Когда вы обращаетесь к элементу ds[i]
, код читает из i
-го файла сигналы ЭКГ и радара, нормирует их в диапазон 0–1, считает их спектры через DSP.stft
, переводит в децибелы (обрезает до −30 дБ, затем растягивает к 0–1) и фазу - sin φ и cos φ
struct Dataset{T<:Vector{String}, W<:AbstractVector{<:Real}}
files::T
fs::Int
nfft::Int
noverlap::Int
window::W
end
function Dataset(root::AbstractString; fs::Int, nfft::Int, noverlap::Int, window::AbstractVector{<:Real})
files = sort(readdir(root; join=true))
return Dataset(files, fs, nfft, noverlap, window)
end
Base.length(ds::Dataset) = length(ds.files)
function Base.getindex(ds::Dataset, idx::Int)
m = matread(ds.files[idx])
rad = Float32.(vec(m["radar"]))
ecg = Float32.(vec(m["ecg"]))
ecg = normalize_zero_to_one(ecg)
rad = normalize_zero_to_one(rad)
length(rad) == length(ecg) || error("length mismatch")
Xrad = DSP.stft(rad, ds.nfft, ds.noverlap; window = ds.window)
Xecg = DSP.stft(ecg, ds.nfft, ds.noverlap; window = ds.window)
mag_r = 20 .* log10.(abs.(Xrad) .+ 1e-6)
mag_r = clamp.(mag_r, -30, 0)
φ_r = angle.(Xrad); sinφ_r = sin.(φ_r); cosφ_r = cos.(φ_r)
mag_e = 20 .* log10.(abs.(Xecg) .+ 1e-6)
mag_e = clamp.(mag_e, -30, 0)
φ_e = angle.(Xecg); sinφ_e = sin.(φ_e); cosφ_e = cos.(φ_e)
mag_r = (mag_r .+ 30f0) ./ 30f0
mag_e = (mag_e .+ 30f0) ./ 30f0
tens_r = cat(mag_r, sinφ_r, cosφ_r; dims = 3)
tens_e = cat(mag_e, sinφ_e, cosφ_e; dims = 3)
data = cat(tens_r, tens_e; dims = 3)
data = Float32.(data)
data = data[1:64, :, :]
return data, rad, ecg
end
Создадим датасет, используя для STFT окно hamming
fs = 2000
nfft = 128
noverlap = nfft ÷ 2
window = DSP.hamming(nfft)
ds = Dataset("data";fs, nfft, noverlap, window);
Далее визуализируем один из сэмплов
sample, rad, ecg = ds[2435]
mag_r = sample[:, :, 1]
mag_e = sample[:, :, 4]
p1 = heatmap(reverse(mag_r; dims=2); # freq снизу вверх
xlabel = "Frame",
ylabel = "Freq, Hz",
title = "Radar", colorbar=false)
p2 = heatmap(reverse(mag_e; dims=2);
xlabel = "Frame",
ylabel = "",
title = "ECG", colorbar=false)
p3 = plot(rad)
p4 = plot(ecg)
plot(p1, p2,p3, p4, layout = (2,2), size = (1400,400))
В основе DDPM лежит процесс постепенного зашумления данных в процессе обучения, можно визуализировать то, как это работает
mkdir("output")
num_timesteps = 40
βs = linear_beta_schedule(num_timesteps, 8e-6, 9e-3)
diffusion = GaussianDiffusion(Vector{Float32}, βs, (1,), identity);
Xt = mag_e
anim = @animate for t ∈ 1:(num_timesteps + 5)
if t <= num_timesteps
μ = Xt .* sqrt.(1 .- βs[t])
s = sqrt(βs[t])
noise = randn(size(Xt))
global Xt = μ .+ s .* noise
end
p = plot(
vec(Xt),
title = "t = $min(t, num_timesteps)",
xlabel = "sample index",
ylabel = "value",
legend = false
)
end
gif(anim, "output/spiral_forward_1.gif", fps=8)
Создадим валидационный и тренировочный наборы данных
N = length(ds)
train_len = floor(Int, 0.8 * N)
val_len = floor(Int, 0.2 * N)
train_idx = 1:train_len
val_idx = (train_len+1):(train_len+val_len)
X_train = [ ds[i][1] for i in train_idx ]
X_val = [ ds[i][1] for i in val_idx ]
Инициализируем некоторые гиперпараметры обучения, используемое для обучения устройство
model_channels = 32
learning_rate = 0.0001
batch_size = 64
num_epochs = 150
loss_type = Flux.mse;
device = CUDA.functional() ? gpu : cpu
function collate(batch)
cat(batch...; dims = 4)
end
Создадим загрузчики данных
train_loader = Flux.DataLoader(X_train; batchsize=batch_size, shuffle=true, parallel = true,collate = collate);
val_loader = Flux.DataLoader(X_val; batchsize=batch_size, shuffle=false,parallel = true,collate = collate)
Инициализируем модель
in_channels = 6;
data_shape = size(X_train[1])
num_timesteps = 100
model = UNet(in_channels, model_channels, num_timesteps;
block_layer=ResBlock,
num_blocks_per_level=1,
channel_multipliers=(1, 2, 2, 4),
num_attention_heads=4,
)
βs = cosine_beta_schedule(num_timesteps, 0.008)
diffusion = GaussianDiffusion(Vector{Float32}, βs, data_shape, model)
display(diffusion.denoise_fn)
println("")
Инициализируем функцию потерь, оптимизатор, и создадим папку, в которой будут храниться логи обучения
device == gpu ? diffusion = gpu(diffusion) : nothing;
loss(diffusion, x::AbstractArray) = p_losses(diffusion, loss_type, x; to_device=device)
println("defining new optimiser")
opt = Flux.Adam(learning_rate)
println(" ", opt)
opt_state = Flux.setup(opt, diffusion)
output_directory = "output/jsonfiles"
mkpath(output_directory)
Зададим словарь, в котором хранятся параметры экспериметра
println("made directory: ", output_directory)
hyperparameters_path = joinpath(output_directory, "hyperparameters.json")
output_path = joinpath(output_directory, "diffusion_opt.bson")
history_path = joinpath(output_directory, "history.json")
hyperparameters = Dict(
"num_timesteps" => num_timesteps,
"data_shape" => "$(diffusion.data_shape)",
"denoise_fn" => "$(typeof(diffusion.denoise_fn).name.wrapper)",
"parameters" => sum(length, Flux.params(diffusion.denoise_fn)),
"model_channels" => model_channels,
"loss_type" => "$loss_type",
"learning_rate" => learning_rate,
"batch_size" => batch_size,
"optimiser" => "$(typeof(opt).name.wrapper)",
)
open(hyperparameters_path, "w") do f
JSON.print(f, hyperparameters)
end
println("saved hyperparameters to $hyperparameters_path")
Определим функции для обучения ИНС
function update_history!(model, history, loss, val_data; prob_uncond::Float64=0.0)
val_loss = batched_loss(loss, model, val_data; prob_uncond=prob_uncond)
push!(history["val_loss"], val_loss)
@printf("val loss: %.5f", history["val_loss"][end])
println("")
end
function batched_loss(loss, model, data::Flux.DataLoader; prob_uncond::Float64=0.0)
total_loss = 0.0
for x in data
x = gpu(x)
total_loss += loss(model, x)
end
total_loss /= length(data)
end
function train!(loss, model, data::Flux.DataLoader, opt_state, val_data;
num_epochs::Int=100,
save_after_epoch::Bool=false,
save_dir::String="",
prob_uncond::Float64=0.0,
)
history = Dict(
"epoch_size" => length(data),
"mean_batch_loss" => Float64[],
"val_loss" => Float64[],
"batch_size" => data.batchsize,
)
for epoch = 1:num_epochs
@printf("epoch: %.0f ", epoch)
total_loss = 0.0
for (idx, x) in enumerate(data)
x = gpu(x)
loss_val, gs = Flux.withgradient(Flux.params(model)) do
loss(model, x)
end
total_loss += loss_val
Flux.update!(opt, Flux.params(model), gs)
end
if save_after_epoch
path = joinpath(save_dir, "model_epoch=$(epoch).bson")
let model = cpu(model) # keep main model on device
BSON.bson(path, Dict(:model => model))
end
end
push!(history["mean_batch_loss"], total_loss / length(data))
@printf("mean batch loss: %.5f ; ", history["mean_batch_loss"][end])
update_history!(model, history, loss, val_data; prob_uncond=prob_uncond)
end
end
Запустим цикл обучения
train_loader
println("Starting training")
start_time = time_ns()
history = train!(loss, diffusion, train_loader, opt_state, val_loader;
num_epochs=num_epochs, save_after_epoch=false, save_dir=output_directory)
end_time = time_ns() - start_time
println("\ndone training")
@printf "time taken: %.2fs\n" end_time / 1e9
batch = 4 # сколько картинок хотим
shape = diffusion.data_shape # (64, 80, 6)
Сохраним нашу обученную модель
using BSON: @save, @load
mkdir("models")
best_path = joinpath("models", "modelorig.bson")
@save best_path diffusion
Выполним инференс модели. На вход подадим шум из нормального распределения, на выходе получим спектрограммы, которые можно добавить для датасет для его увеличения
@load best_path diffusion
diffusion = device(diffusion)
x0 = p_sample_loop(diffusion, 6; to_device=device)
x0 = cpu(x0)
INDEX_IMG = 1
mag_r = x0[:, :, 1, INDEX_IMG]
mag_e = x0[:, :, 4, INDEX_IMG]
p1 = heatmap(reverse(mag_r; dims=2); # freq снизу вверх
xlabel = "Frame",
ylabel = "Freq, Hz",
title = "Radar", colorbar=false)
p2 = heatmap(reverse(mag_e; dims=2);
xlabel = "Frame",
ylabel = "",
title = "ECG", colorbar=false)
INDEX_IMG = 2
mag_r = x0[:, :, 1, INDEX_IMG]
mag_e = x0[:, :, 4, INDEX_IMG]
p3 = heatmap(reverse(mag_r; dims=2); # freq снизу вверх
xlabel = "Frame",
ylabel = "Freq, Hz",
title = "Radar", colorbar=false)
p4 = heatmap(reverse(mag_e; dims=2);
xlabel = "Frame",
ylabel = "",
title = "ECG", colorbar=false)
plot(p1, p2,p3, p4, layout = (2,2), size = (1400,400))
Выводы
В данном демо-примере была обучена нейросеть для генерации спектрограмм. Сгенерированные данные можно использовать для аугментации датасета