Подходы к проектированию нейронных регуляторов¶
Вебинар Разработка перспективных видов регуляторов для объектов управления в Engee состоял из нескольких примеров:
В этом проекте собраны сопроводительные материалы для третьей части вебинара, остальные вы можете изучить в сообществе по приведенным выше ссылкам.
Пример с обычным PID регулятором¶
Это основная модель, в которой мы будем заменять регулятор.
Модель liquid_pressure_regulator.engee
Адаптивный PID регулятор¶
Первым делом мы настроим адаптивный PID регулятор, коэффициенты которого с каждым временным шагом проходят следующую процедуру обновления:
if abs(e) > c.thr
c.Kp = max(0, c.Kp + clamp(c.α*e*c.pe, -c.ΔK, c.ΔK))
c.Ki = max(0, c.Ki + clamp(c.α*e*c.pie, -c.ΔK, c.ΔK))
c.Kd = max(0, c.Kd + clamp(c.α*e*c.pde, -c.ΔK, c.ΔK))
end
Во-первых, мы не будем реагировать на скачки управляюшего сигнала больше некоторого размера c.thr
(параметр threshold
). Во-вторых, параметр α
ограничивает скорость изменения всех параметроы регулятора, при этом шаг изменения ограничен по модулю параметром ΔK
.
Модель liquid_pressure_regulator_adaptive_pid.engee
Нейронный регулятор на Julia (RNN)¶
В этом регуляторе приведен код рекуррентной нейросети, выполненной на Julia без высокоуровневых библиотек. Его обучение происходит "онлайн", в процессе работы системы, поэтому качество его работы очень сильно зависит от исходных значений его весов. Возможно нужно применить процедуру предобучения, как минимум для того, чтобы нейросеть начала выдавать стабильное значение при стационарном, вначале расчета, состоянии трубопровода.
Модель liquid_pressure_regulator_neural_test.engee
Создаем нейронный регулятор на Си (FNN)¶
Запустим модель и соберем датасет¶
Pkg.add("CSV")
using DataFrames, CSV
engee.open("$(@__DIR__)/liquid_pressure_regulator.engee")
data = engee.run( "liquid_pressure_regulator" )
function simout_to_df( data )
vec_names = [i for i in keys(data) if length(data[i].value) > 0];
df = DataFrame( hcat([collect(data[v]).value for v in vec_names]...), vec_names );
df.time = collect(data[vec_names[1]]).time;
return df
end
df = simout_to_df( data );
CSV.write("Режим 1.csv", df);
plot(
plot( df.time, df.set_point ), plot( df.time, df.control ), plot( df.time, df.pressure ), layout=(3,1)
)
Изменим параметры модели и запустим ее в другом сценарии.
engee.set_param!( "liquid_pressure_regulator/Сигнал утечки", "Amplitude"=>"0.001" )
data = engee.run( "liquid_pressure_regulator" )
df = simout_to_df( data );
CSV.write("Режим 2.csv", df);
plot(
plot( df.time, df.set_point ), plot( df.time, df.control ), plot( df.time, df.pressure ), layout=(3,1)
)
engee.set_param!( "liquid_pressure_regulator/Сигнал утечки", "Amplitude"=>"0.0005", "Frequency"=>"0.2" )
data = engee.run( "liquid_pressure_regulator" )
df = simout_to_df( data );
CSV.write("Режим 3.csv", df);
plot(
plot( df.time, df.set_point ), plot( df.time, df.control ), plot( df.time, df.pressure ), layout=(3,1)
)
Вернем все параметры модели на место
engee.set_param!( "liquid_pressure_regulator/Сигнал утечки", "Amplitude"=>"0.0005" )
engee.set_param!( "liquid_pressure_regulator/Сигнал утечки", "Frequency"=>"0.1" )
Обучим нейросеть аппроксимировать несколько регуляторов¶
Этот код позволяет подготовить данные, задает структуру трехслойной полносвязанной нейросети и обучает ее находить зависимость между прошлыми 20 значениями рассогласования и следующим значением сигнала управления.
Pkg.add(["Flux", "BSON", "Glob", "MLUtils"])
using Flux, MLUtils
using CSV, DataFrames
using Statistics, Random
using BSON, Glob
# Инициализируем генератор случайных чисел ради воспроизводимости эксперимента
Random.seed!(42)
# 1. Подготовим данные
function load_and_preprocess_data_fnn()
# Загрузим все CSV файлы из текущей папки
files = glob("*.csv")
dfs = [CSV.read(file, DataFrame, types=Float32) for file in files]
# Совместим данные в одну таблицу
combined_df = vcat(dfs...)
# Извлечем нужные нам столбцы (time, error, control)
vtime = combined_df.time
error = combined_df.error
control = combined_df.control
# Нормализация данных (очень поможет с ускорением обучения нейросети)
error_mean, error_std = mean(error), std(error)
control_mean, control_std = mean(control), std(control)
error_norm = (error .- error_mean) ./ error_std
control_norm = (control .- control_mean) ./ control_std
# Разделим на небольшие последовательности чтобы обучить RNN
sequence_length = 20 # сколько прошлых шагов мы учитываем для прогноза сигнала управления
X = []
Y = []
for i in 1:(length(vtime)-sequence_length)
push!(X, error_norm[i:i+sequence_length-1])
push!(Y, control_norm[i+sequence_length])
end
# Оформим как массивы
#X = reshape(hcat(X...), sequence_length, 1, :) # С батчами
X = hcat(X...)'
Y = hcat(Y...)'
return (X, Y), (error_mean, error_std, control_mean, control_std)
end
# 2. Определяем структуру модели
function create_fnn_controller(input_size=20, hidden_size=5, output_size=1)
return Chain(
Dense(input_size, hidden_size, relu),
Dense(hidden_size, hidden_size, relu),
Dense(hidden_size, output_size)
)
end
# 3. Обучение с новым API Flux
function train_fnn_model(X, Y; epochs=100, batch_size=32)
# Разделение данных
split_idx = floor(Int, 0.8 * size(X, 1))
X_train, Y_train = X[1:split_idx, :], Y[1:split_idx, :]
X_val, Y_val = X[split_idx+1:end, :], Y[split_idx+1:end, :]
# Создание модели и оптимизатора
model = create_fnn_controller()
optimizer = Flux.setup(Adam(0.001), model)
# Функция потерь
loss(x, y) = Flux.mse(model(x), y)
# Подготовка DataLoader
train_loader = Flux.DataLoader((X_train', Y_train'), batchsize=batch_size, shuffle=true)
# Цикл обучения
train_losses = []
val_losses = []
for epoch in 1:epochs
# Обучение
Flux.train!(model, train_loader, optimizer) do m, x, y
y_pred = m(x)
Flux.mse(y_pred, y)
end
# Расчет ошибки
train_loss = loss(X_train', Y_train')
val_loss = loss(X_val', Y_val')
push!(train_losses, train_loss)
push!(val_losses, val_loss)
# Логирование
if epochs % 10 == 0
@info "Epoch $epoch" train_loss val_loss
end
end
# Визуализация обучения
plot(1:epochs, train_losses, label="Training Loss")
plot!(1:epochs, val_losses, label="Validation Loss")
xlabel!("Epoch")
ylabel!("Loss")
title!("Training Progress")
return model
end
# 4. Оценка модели (без изменений)
function evaluate_fnn_model(model, X, Y, norm_params)
predictions = model(X')
# Денормализация
_, _, control_mean, control_std = norm_params
Y_true = Y .* control_std .+ control_mean
Y_pred = predictions' .* control_std .+ control_mean
# Расчет метрик
rmse = sqrt(mean((Y_true - Y_pred).^2))
println("RMSE: ", rmse)
# Визуализация
plot(Y_true[1:100], label="True Control Signal")
plot!(Y_pred[1:100], label="Predicted Control Signal")
xlabel!("Time Step")
ylabel!("Control Signal")
title!("FNN Controller Performance")
end
# Загрузка данных
(X, Y), norm_params = load_and_preprocess_data_fnn()
# Обучение
model = train_fnn_model(X, Y, epochs=100, batch_size=32)
# Сохранение модели
using BSON
BSON.@save "fnn_controller_v2.bson" model norm_params
# Оценка
evaluate_fnn_model(model, X, Y, norm_params)
Сгенерируем код на Си для полносвязанной нейросети¶
Применим библиотеку Symbolics
чтобы сгенерировать код и выполним несколько доработок, чтобы его можно было вызывать из блока C Function
.
Pkg.add("Symbolics")
using Symbolics
@variables X[1:20]
c_model_code = build_function( model( collect(X) ), collect(X); target=Symbolics.CTarget(), fname="neural_net", lhsname=:y, rhsnames=[:x] )
# Заменим несколько инструкций в коде
c_fixed_model_code = replace( c_model_code,
"double" => "float",
"f0" => "f",
" y[0]"=>"y" );
println( c_fixed_model_code[1:200] )
println("...")
# Оставим только третью строчку от этого кода
c_fixed_model_code = split(c_fixed_model_code, "\n")[3]
c_code_standalone = """
float ifelse(bool cond, float a, float b) { return cond ? a : b; }
$c_fixed_model_code""";
println( c_code_standalone[1:200] )
println("...")
Сохраним код в файл
open("$(@__DIR__)/neural_net_fc.c", "w") do f
println( f, "$c_code_standalone" )
end
Модель liquid_pressure_regulator_neural_fc.engee
Заключение¶
Очевидно что мы обучали нейросети слишком короткое время на слишком небольшом примере. Есть много шагов, которые должны позволить улучшить качество такого регулятора - например, подать на вход нейросети большее количество сигналов, или просто собрать датасет большего размера для более качественного оффлайн-обучения. Мы показали, как подойти к разработке нейронного регулятора и как протестировать его в модельном окружении Engee.