Классификация Радиолокационных сигналов
Классификация сигналов радиолокации с использованием глубокого обучения
Введение
В этом примере показано, как классифицировать сигналы радаров с помощью глубокого обучения.
Классификация модуляции — важная функция интеллектуального приёмника. Классификация модуляции имеет множество применений, например, в когнитивных радарах и программно-определяемых радиосистемах. Как правило, для идентификации этих сигналов и их классификации по типу модуляции необходимо определить значимые признаки и ввести их в классификатор. В этом примере рассматривается автоматическое извлечение частотно-временных признаков из сигналов и их классификации с помощью сети глубокого обучения.
Первая часть этого примера моделирует систему классификации радиолокационных сигналов, которая синтезирует три импульсных сигнала и классифицирует их. Форма сигналов радиолокационных сигналов:
-
Прямоугольный
-
Линейная частотная модуляция (ЛЧМ)
-
Код Баркера
Подготовка
Установка необходимых пакетов
include("PackagesHelper.jl")
using Pkg
Pkg.instantiate()
using CategoricalArrays
using OneHotArrays
using BSON: @save, @load
using Random, Statistics, FFTW, DSP, Images, ImageIO, ImageTransformations, FileIO
using Flux, Metalhead, MLUtils, CUDA, StatsBase, StatisticalMeasures
include("utils.jl");
Зададим определенные константы, связанные с устройством, на котором будет обучаться модель, разрешением изображений, параметров для генерации данных и тд.
Random.seed!(42)
device == gpu ? model = gpu(model) : nothing;
IMG_SZ=(224,224)
OUT="tfd_db"
radar_classes=["Barker","LFM","Rect"]
CLASSES = 1 : length(radar_classes)
N_PER_CLASS=600
train_ratio,val_ratio,test_ratio = 0.8,0.1,0.1
DATA_MEAN=(0.485f0,0.456f0,0.406f0)
DATA_STD=(0.229f0,0.224f0,0.225f0)
Сгенерируем набор данных, которые представляют собой сигналы трех типов. Функция helperGenerateRadarWaveforms
создаёт синтетический датасет радиосигналов трёх типов модуляции: прямоугольные импульсы, LFM-чирпы и коды Баркера. Для каждого сигнала случайно выбираются параметры (несущая, полоса, длина, направление модуляции и т.п.), после чего к нему добавляется шум, частотный сдвиг и искажения. На выходе функция возвращает список комплексных последовательностей и список меток с типом модуляции для каждого сигнала.
data, truth = helperGenerateRadarWaveforms(Fs=1e8, nSignalsPerMod=3000, seed=0)
Далее превращаем набор радиосигналов в изображения для последующего обучения нейросети. Сначала функция tfd_image_gray
строит спектрограмму сигнала, переводит её в логарифмическую шкалу, нормализует значения и формирует картинку в градациях серого. Затем функция save_dataset_as_tfd_images_splits
принимает список сигналов и их метки классов, случайным образом делит их на обучающую, валидационную и тестовую части, создаёт нужную структуру папок и сохраняет для каждого сигнала соответствующее PNG-изображение спектрограммы.
save_dataset_as_tfd_images_splits(data, truth; Fs=1e8, outdir="tfd_db", img_sz=(224,224), ratios=(0.8,0.1,0.1), seed=0)
Визуализируем по одному сигналу каждого типа
img1 = Images.load(raw"tfd_db/val/Rect/4283146180.png")
img2 = Images.load(raw"tfd_db/val/Barker/3375303598.png")
img3 = Images.load(raw"tfd_db/val/LFM/3736510008.png")
[img1 img2 img3]
Далее опишем структуру, которая задает датасет для обучения модели. Функция create_dataset
принимает пути изображений, загружает их в память, а затем применяет к ним преобразования, такие как ресайз, перестановка осей в том порядке, в котором требует FLux
function Augment_func(img)
resized = imresize(img, 224, 224)
rgb = RGB.(resized)
ch = channelview(rgb)
x = permutedims(ch, (3,2,1))
Float32.(x)
end
function Create_dataset(path)
img_train = []
img_test = []
img_valid = []
label_train = []
label_test = []
label_valid = []
train_path = joinpath(path, "train");
test_path = joinpath(path, "test");
valid_path = joinpath(path, "val");
function process_directory(directory, img_array, label_array, label_idx)
for file in readdir(directory)
if endswith(file, ".jpg") || endswith(file, ".png")
file_path = joinpath(directory, file);
img = Images.load(file_path);
img = Augment_func(img);
push!(img_array, img)
push!(label_array, label_idx)
end
end
end
for (idx, label) in enumerate(readdir(train_path))
println("Processing label in train: ", label)
label_dir = joinpath(train_path, label)
process_directory(label_dir, img_train, label_train, idx);
end
for (idx, label) in enumerate(readdir(test_path))
println("Processing label in test: ", label)
label_dir = joinpath(test_path, label)
process_directory(label_dir, img_test, label_test, idx);
end
for (idx, label) in enumerate(readdir(valid_path))
println("Processing label in valid: ", label)
label_dir = joinpath(valid_path, label)
process_directory(label_dir, img_valid, label_valid, idx);
end
return img_train, img_test, img_valid, label_train, label_test, label_valid;
end;
Создадим датасет
img_train, img_test, img_valid, label_train, label_test, label_valid = Create_dataset("tfd_db");
Создадим загрузчики данных
train_loader = DataLoader((data=img_train, label=label_train), batchsize=64, shuffle=true, collate=true)
test_loader = DataLoader((data=img_test, label=label_test), batchsize=64, shuffle=false, collate=true)
valid_loader = DataLoader((data=img_valid, label=label_valid), batchsize=64, shuffle=false, collate=true)
Далее опишем функции для тренировки и валидации модели
function train!(model, train_loader, opt, loss_fn, device, epoch::Int, num_epochs::Int)
Flux.trainmode!(model)
running_loss = 0.0
n_batches = 0
for (data, label) in train_loader
x = device(data)
yoh = Flux.onehotbatch(label, CLASSES) |> device
loss_val, gs = Flux.withgradient(Flux.params(model)) do
ŷ = model(x)
loss_fn(ŷ, yoh)
end
Flux.update!(opt, Flux.params(model), gs)
running_loss += Float64(loss_val)
n_batches += 1
end
return opt, running_loss / max(n_batches, 1)
end
function validate(model, val_loader, loss_fn, device)
Flux.testmode!(model)
running_loss = 0.0
n_batches = 0
for (data, label) in train_loader
x = device(data)
yoh = Flux.onehotbatch(label, CLASSES) |> device
ŷ = model(x)
loss_val = loss_fn(ŷ, yoh)
running_loss += Float64(loss_val)
n_batches += 1
end
Flux.trainmode!(model)
return running_loss / max(n_batches, 1)
end
Инициализируем модель SqueezeNet
model = SqueezeNet(pretrain=false, nclasses=length(radar_classes))
model = gpu(model);
Зададим функцию потерь, оптимизатор и скорость обучения
lr = 0.0001
lossFunction(x, y) = Flux.Losses.logitcrossentropy(x, y);
opt = Flux.Adam(lr, (0.9, 0.99));
Classes = 1:length(radar_classes);
Запустим цикл обучения модели
no_improve_epochs = 0
best_model = nothing
train_losses = [];
valid_losses = [];
best_val_loss = Inf;
num_epochs = 50
for epoch in 1:num_epochs
println("-"^50 * "\n")
println("EPOCH $(epoch):")
opt, train_loss = train!(
model, train_loader, opt,
lossFunction, gpu, 1, num_epochs
)
val_loss = validate(model, valid_loader, lossFunction, gpu)
if val_loss < best_val_loss
best_val_loss = val_loss
best_model = deepcopy(model)
end
println("Epoch $epoch/$num_epochs | train $(round(train_loss, digits=4)) | val $(round(val_loss, digits=4))")
push!(train_losses, train_loss)
push!(valid_losses, val_loss)
end
Сохраним обученную модель
model = cpu(model)
@save "$(@__DIR__)/models/modelCLSRadarSignal.bson" model
Тестирование модели
Выполним инференс, а также построим матрицу ошибок
model_data = load("$(@__DIR__)/models/modelCLSRadarSignal.bson")
model = model_data[:model] |> gpu;
Напишем функцию для оценки обученной модели, которая вычисляет метрику Accuracy
total_loss, correct_predictions, total_samples = 0.0, 0, 0
all_preds = []
True_labels = []
for (data, label) in enumerate(test_loader)
x = gpu(data)
yoh = Flux.onehotbatch(label, CLASSES) |> gpu
ŷ = model(x)
total_loss= loss_fn(ŷ, yoh)
# y_pred = model(imgs, feat)
# total_loss += Flux.Losses.logitcrossentropy(y_pred, onehotbatch(y, classes))
preds = onecold(ŷ, classes)
true_classes = y
append!(all_preds, preds)
append!(True_labels, true_classes)
correct_predictions += sum(preds .== true_classes)
total_samples += length(y)
end
accuracy = 100.0 * correct_predictions / total_samples
accuracy_score, all_preds, true_predS = evaluate_model_accuracy(test_loader, model, CLASSES);
println("Accuracy trained model:", accuracy_score, "%")
А также протестируем на конкретном объекте
classes = readdir("tfd_db/train")
cls = rand(classes)
files = readdir(joinpath("tfd_db/train", cls))
f = rand(files)
path = joinpath("tfd_db/train", cls, f)
img = Images.load(path)
img = Augment_func(img)
img = reshape(img, size(img)..., 1)
ŷ = model(gpu(img))
probs = Flux.softmax(ŷ)
pred_idx = argmax(Array(probs))
pred_class = radar_classes[pred_idx]
println("Истинный класс: ", cls)
println("Предсказанный класс: ", pred_class)
Заключение
В данном демо-примере была обучена сверточная нейронная сеть для классификации радиолокационных сигналов.
Результаты обучения имеют хорошие показатели