Классификация LPI радаров
Введение
Системы LPI-радара (Low Probability of Intercept) разработаны для уклонения от обнаружения за счёт использования специальных схем модуляции и передачи на широких частотных диапазонах с низким уровнем мощности. В современных LPI-радарах эти условия обычно достигаются передачей непрерывно-волнового (CW) сигнала. Обнаружение и классификация LPI-сигналов представляет собой серьёзную задачу. Недавние достижения показывают, что методы глубокого обучения могут успешно применяться для классификации радарных сигналов.
В этом примере показано, как классифицировать LPI-радарные сигналы с помощью нейронной сети. Перед подачей данных на вход ИНС они претерпевают обработку - из комплексного сигнала получаем данные во временно-частотной области, после которой при помощи обработки изображений получаем бинарную спектрограмму и вектор признаков, полученный с помощью HoG.
Подробнее о предобработке данных ниже
Сама нейросеть состоит из двух ветвей. Первая ветвь - двумерная сверточная сеть, на вход которой подается бинарная спектрограмма. Вторая ветвь нейросети - одномерная сверточная сеть, на вход которой подается вектор признаков, извлеченных с помощью HoG из бинарной спектрограммы. Данные фичи на выходе двух ветвей объединяются и подаются в голову ИНС - 2 полносвязных слоя, выдающие логиты. Ну и потом с помощью кросэнтропии вычисляются предсказанные метки классов для сигнала.
Подготовка к работе
Импортируем необходимые пакеты
include("installPackages.jl");
using Pkg
Pkg.instantiate()
using SignalAnalysis
using DSP
using MAT
using Plots
using Images, ColorSchemes, Random
using ImageBinarization, ImageFiltering, ImageMorphology, ImageFeatures
using Flux, CUDA, cuDNN
using OneHotArrays
using StatisticalMeasures
include("show_sample.jl");
include("utils.jl");
CM = StatisticalMeasures.ConfusionMatrices;
Зададим параметры, которые будут использоваться в данной работе
n = 64
hop=48
Fs = 20e6
nfft = 256;
threshold = 210 / 255
datadir = "data";
n_classes = 11;
lr = 1e-3
CLASSES = 1:n_classes;
Обзор данных
Проведем предобработку для одного выбранного сигнала
Для начала загрузим и прочитаем из файла сигнал. data - сигнал в комплексной форме
testpath = joinpath(datadir, "Frank")
files = readdir(testpath)
n_files = length(files)
ind = Random.rand(1:n_files)
filepath = joinpath(testpath, files[ind])
data = matread(filepath)["x"][:, 1];
Функция show_sample вычисляет спектрограмму на сигнале, используя параметры для STFT, которые мы задали выше
z, fig = show_sample(data, n, hop, Fs, nfft);
fig
Функция getImage позволяет из рассчитанной спектрограммы получить изображение в формате RGB. Ну а getBinary вычисляет с заданным порогом бинарное изображение полученной спектрограммы. Результаты визуализированы ниже
img = getImage(z)
bnr_img = getBinary(img, threshold)
[img bnr_img]
Далее полученную бинарную спектрограмму подадим на HoG для извлечения признаков. HoG рассчитывает гистограмму ориентаций градиентов (направлений изменений яркости). На выходе метода получаем вектор признаков, которые можно подать на вход одномерной сверточной сети.
В целом, это вектор можно и визуализировать. Высокие столбцы означают, что в соответствующей ячейке сильно выражена граница под данным углом; нулевые или почти нулевые значения — что градиенты этого направления в ячейке отсутствуют.
featureHoG = calcHoG(bnr_img)
plot(featureHoG, legend=false, xticks=false, yticks=false)
Далее создадим структуру датасета, который итеративно для каждого сигнала будет выполнять весь проделанный выше пайплайн. Помимо этого, разобьем все данные на три набора - тренировочный, валидационный и тестовый в стандартных пропорциях. Также создадим загрузчики данных с размером батча 64.
subfolders = readdir(datadir)
classes = subfolders
paths = String[]
labels = Int[]
for (cls_idx, sub) in enumerate(classes)
folder = joinpath(datadir, sub)
for fname in readdir(folder)
push!(paths, joinpath(folder, fname))
push!(labels, cls_idx)
end
end
N = length(paths)
perm = randperm(N)
paths = paths[perm]
labels = labels[perm]
N = length(paths)
train_len = floor(Int, 0.7 * N)
val_len = floor(Int, 0.15 * N)
test_len = N - train_len - val_len
train_idx = 1:train_len
val_idx = (train_len+1):(train_len+val_len)
test_idx = (train_len+val_len+1):N
train_paths, train_labels = paths[train_idx], labels[train_idx]
val_paths, val_labels = paths[val_idx], labels[val_idx]
test_paths, test_labels = paths[test_idx], labels[test_idx]
struct LazyRadarDataseT
paths::Vector{String}
labels::Vector{Int}
n; hop; Fs; nfft; threshold
end
Base.getindex(ds::LazyRadarDataseT, i::Int) = begin
data = matread(ds.paths[i])["x"][:,1]
b, f = prepareSample(data, ds.n, ds.hop, ds.Fs, ds.nfft, ds.threshold)
return Float16.(b), Float16.(f), ds.labels[i]
end
function Base.getindex(ds::LazyRadarDataseT, idxs::AbstractVector{Int})
B = length(idxs)
b0, f0, _ = ds[idxs[1]]
nb, Tb = size(b0)
Tf = length(f0)
batch_b = Array{Float16}(undef, nb, Tb, 1, B)
batch_f = Array{Float16}(undef, Tf, 1, B)
batch_labels = Vector{Int}(undef, B)
for (j, i) in enumerate(idxs)
bj, fj, lbl = ds[i]
batch_b[:, :, :, j] = bj
batch_f[:,:, j] = fj
batch_labels[j] = lbl
end
return batch_b, batch_f, batch_labels
end
Base.length(ds::LazyRadarDataseT) = length(ds.paths)
train_ds = LazyRadarDataseT(train_paths, train_labels, n, hop, Fs, nfft, threshold)
val_ds = LazyRadarDataseT(val_paths, val_labels, n, hop, Fs, nfft, threshold)
test_ds = LazyRadarDataseT(test_paths, test_labels, n, hop, Fs, nfft, threshold);
# --- 4) Оборачиваем их в DataLoader — Flux на лету вытянет нужные элементы по индексу
batch_size = 128
train_loader = Flux.DataLoader(train_ds;
batchsize = batch_size,
shuffle = true,
partial = true,
parallel = true,
)
val_loader = Flux.DataLoader(val_ds; batchsize=batch_size, shuffle=true)
test_loader = Flux.DataLoader(test_ds; batchsize=batch_size, shuffle=true)
Определим девайс, на котором будет выполняться обучение сети
device = CUDA.functional() ? gpu : cpu
Определим архитектуру нейронной сети, состоящей из двух ветвей и головы
function makeBranch2dCnn(; p = 0.5)
Chain(
Conv((5,5), 1=>64, pad=1), relu,
x -> maxpool(x, (2,2)),
Conv((5,5), 64=>128, pad=1), relu,
x -> maxpool(x, (2,2)),
Conv((5,5), 128=>128, pad=1), relu,
x -> maxpool(x, (2,2)),
Conv((5,5), 128=>256, pad=1), relu,
x -> maxpool(x, (2,2)),
Conv((5,5), 256=>512, pad=1), relu,
Dropout(p),
x -> maxpool(x, (2,2)),
)
end
function makeBranch1dCnn(; p = 0.25)
Chain(
Conv((5,), 1=>8, pad=1), relu,
x -> maxpool(x, (2,)),
Conv((5,), 8=>64, pad=1), relu,
x -> maxpool(x, (2,)),
)
end
function makeHead()
head = Chain(
t -> begin
x1, x2 = t
v1 = reshape(x2, :, size(x2, 3))
v2 = reshape(x1, :, size(x1, 4))
return vcat(v1, v2)
end,
Dense(432576, 128, relu),
Dense(128, n_classes)
);
end
struct Net
branch1::Chain
branch2::Chain
head::Chain
end
function Net()
cnn2d = makeBranch2dCnn();
cnn1d = makeBranch1dCnn();
head = makeHead()
Net(cnn2d, cnn1d, head)
end
function (m::Net)(imgs, feat)
out2d = m.branch1(imgs)
out1d = m.branch2(feat)
return m.head((out2d, out1d))
end
Flux.@functor Net
Инициализируем экземпляр сети и переведем ее на соответствующий девайс
model = Net()
device == gpu ? model = gpu(model) : nothing;
Инициализируем оптимизатор с заданной скоростью обучения
optimizer = Flux.Adam(1e-4, (0.9, 0.99));
Определим функции для тренировки и валидации на одной эпохе
lossSnet(imgs, features, y) = Flux.Losses.logitcrossentropy(Net(imgs, features), y);
function train_one_epoch(model, Loader, correct_sample, TSamples, Tloss, Optimizer, device)
for (i, (imgs, feat, y)) in enumerate(Loader)
# println("Iteration: ", i)
TSamples += length(y)
imgs = gpu(imgs)
feat = gpu(feat)
y = gpu(y)
wg = Flux.withgradient(Flux.params(model)) do
ŷ = model(imgs, feat)
l = Flux.Losses.logitcrossentropy(ŷ, onehotbatch(y, CLASSES))
return l, ŷ # <- как в ваших примерах: два значения
end
batch_loss, y_pred = wg.val
Flux.update!(Optimizer, Flux.params(model), wg.grad)
Tloss += batch_loss
correct_sample += sum(onecold(y_pred, CLASSES) .== y)
end
return Tloss, TSamples, correct_sample
end;
function validate(model, val_loader, task, device, correct_sample, TSamples, running_loss)
preds = nothing
targets = Int[] # пустой вектор Int
# n_batches = 0
for (i, (imgs, feat, y)) in enumerate(val_loader)
TSamples += length(y)
imgs = gpu(imgs)
feat = gpu(feat)
y = gpu(y)
ŷ = model(imgs, feat)
running_loss += Flux.Losses.logitcrossentropy(ŷ, onehotbatch(y, CLASSES))
# n_batches += 1
correct_sample += sum(onecold(ŷ, CLASSES) .== y)
end
# avg_loss = running_loss / max(n_batches, 1)
epoch_acc = 100.0 * (correct_sample / TSamples)
epoch_loss = running_loss / TSamples
return epoch_loss, preds, targets, epoch_acc
end
Запустим цикл обучения сети
for epoch in 1:10
total_loss = 0.0
train_running_correct = 0
total_samples = 0
correct_sample_val = 0
TSamples_val = 0
running_loss_val = 0
@info "Epoch $epoch"
@info "Тренировка"
Tloss, TSamples, correct_sample = train_one_epoch(model, train_loader, train_running_correct,
total_samples, total_loss, optimizer, device)
@info "Валидация"
avg_loss, preds, targets, epoch_acc_valid = validate(model, val_loader, "val", device, correct_sample_val, TSamples_val, running_loss_val)
epoch_loss = Tloss / TSamples
epoch_acc = 100.0 * (correct_sample / TSamples)
println("loss: $epoch_loss, accuracy: $epoch_acc, val_loss:$avg_loss, val_acc:$epoch_acc_valid")
end
Оценим модель
function evaluate_model_accuracy(loader, model, classes)
total_loss, correct_predictions, total_samples = 0.0, 0, 0
all_preds = []
True_labels = []
for (i, (imgs, feat, y)) in enumerate(loader)
imgs = gpu(imgs)
feat = gpu(feat)
y = gpu(y)
y_pred = model(imgs, feat)
total_loss += Flux.Losses.logitcrossentropy(y_pred, onehotbatch(y, classes))
preds = onecold(y_pred, classes)
true_classes = y
append!(all_preds, preds)
append!(True_labels, true_classes)
correct_predictions += sum(preds .== true_classes)
total_samples += length(y)
end
# Вычисление точности
accuracy = 100.0 * correct_predictions / total_samples
return accuracy, all_preds, True_labels
end;
Оценим нашу модель, вычислив точность на тестовом наборе данных
accuracy_score_LSTM, all_predsLSTMm, true_predS_LSTM = evaluate_model_accuracy(test_loader, model, CLASSES);
println("Accuracy trained model:", accuracy_score_LSTM, "%")
Теперь построим матрицу ошибок для визуальной оценки модели
function plot_confusion_matrix(C)
# Создаем heatmap с большими шрифтами и контрастными цветами
heatmap(
C,
title = "Confusion Matrix",
xlabel = "Predicted",
ylabel = "True",
xticks = (1:length(classes), classes),
yticks = (1:length(classes), classes),
c = :viridis, # Более контрастная цветовая схема
colorbar_title = "Count",
size = (600, 400)
)
# Добавим значения в ячейки для улучшенной наглядности
for i in 1:size(C, 1)
for j in 1:size(C, 2)
annotate!(j, i, text(C[i, j], :white, 12, :bold))
end
end
# Явно отображаем график
display(current())
end;
preds_for_CM_LSTM = map(x -> x[1], all_predsLSTMm);
conf_matrix = CM.confmat(preds_for_CM_LSTM, true_predS_LSTM)
conf_matrix = CM.matrix(conf_matrix)
plot_confusion_matrix(conf_matrix)
Дальнейшее улучшение модели предполагает более качественный выбор архитектуры, подбор оптимальных гиперпараметров
Заключение
В этом примере представлен рабочий процесс для выполнения классификации радиолокационных целей с использованием методов машинного обучения и глубокого обучения. Хотя в этом примере для обучения и тестирования использовались синтезированные данные, его можно легко расширить, чтобы учесть реальные результаты работы радара.
.png)