Сеть и потоковая передача
Julia предоставляет интерфейс с широкими возможностями для работы с объектами, осуществляющими потоковый ввод-вывод данных, такими как терминалы, каналы и сокеты TCP. Хотя этот интерфейс является асинхронным на уровне системы, для программиста он выглядит синхронным и беспокоиться о его внутренней асинхронной работе обычно не требуется. Это реализуется за счет интенсивного использования имеющихся в Julia функций совместной многопоточности (сопрограмм).
Базовый потоковый ввод-вывод
Для всех потоков в Julia доступны как минимум методы read
и write
, которые принимают название потока в качестве первого аргумента, например:
julia> write(stdout, "Hello World"); # Подавляет возврат значения 11 за счет «;»
Hello World
julia> read(stdin, Char)
'\n': ASCII/Unicode U+000a (category Cc: Other, control)
Обратите внимание, что команда write
возвращает 11 — число байтов (во фразе "Hello World"
), выводимых в поток stdout
, однако возврат этого значения подавляется с помощью ;
.
После этого вновь нажимается клавиша Enter, чтобы среда Julia считала переход на новую строку. Как видно из примера, write
принимает в качестве второго аргумента записываемые данные, а read
— тип считываемых данных.
Например, для считывания простого байтового массива можно сделать следующее.
julia> x = zeros(UInt8, 4)
4-element Array{UInt8,1}:
0x00
0x00
0x00
0x00
julia> read!(stdin, x)
abcd
4-element Array{UInt8,1}:
0x61
0x62
0x63
0x64
Однако это довольно громоздкая реализация, поэтому есть несколько более удобных методов. Например, процедуру выше можно записать следующим образом.
julia> read(stdin, 4)
abcd
4-element Array{UInt8,1}:
0x61
0x62
0x63
0x64
Если же мы хотим считать строку целиком, нужно следующее.
julia> readline(stdin)
abcd
"abcd"
Учтите, что в зависимости от настроек терминала у вас может осуществляться буферизация строк в TTY, поэтому может потребоваться еще раз нажать Enter, чтобы отправить данные в Julia.
for line in eachline(stdin)
print("Found $line")
end
А для чтения отдельных символов можно использовать read
.
while !eof(stdin)
x = read(stdin, Char)
println("Found: $x")
end
Текстовый ввод-вывод
Обратите внимание, что вышеуказанный метод write
работает с потоками двоичных данных. В частности, значения не преобразуются в какое-либо каноническое текстовое представление, а выводятся как есть.
julia> write(stdout, 0x61); # Подавляет возврат значения 1 за счет «;»
a
Здесь функция write
выводит a
в потоке stdout
, а ее возвращаемым значением является 1
(так как размер 0x61
— один байт).
Для текстового ввода-вывода используйте в зависимости от необходимости метод print
или show
(подробное описание разницы между ними см. в их документации).
julia> print(stdout, 0x61)
97
Дополнительные сведения о реализации методов отображения для пользовательских типов см. в разделе Настраиваемая структурная распечатка программного кода.
Контекстные свойства вывода
Иногда для вывода бывает полезно передавать методам отображения контекстные данные. В качестве платформы для связи произвольных метаданных с объектом ввода-вывода используют объект IOContext
. Например, :compact => true
добавляет в объект ввода-вывода параметр с подсказкой о том, что вызываемый метод отображения должен выводить сокращенные данные (если применимо). В документации по IOContext
приведен список часто используемых свойств.
Работа с файлами
Как и во многих других средах, в Julia есть функция open
, принимающая имя файла и возвращающая объект IOStream
, который можно использовать для чтения и записи данных в файле. Допустим, у нас есть файл hello.txt
, содержащий строку Hello, World!
.
julia> f = open("hello.txt")
IOStream(<file hello.txt>)
julia> readlines(f)
1-element Array{String,1}:
"Hello, World!"
Чтобы записать что-то в файл, вы можете открыть его с флагом записи ("w"
).
julia> f = open("hello.txt","w")
IOStream(<file hello.txt>)
julia> write(f,"Hello again.")
12
Если на этом этапе посмотреть, что содержится в hello.txt
, вы увидите, что файл пуст, потому что на диск еще ничего не записывалось. Чтобы записанные данные сохранились на диске, необходимо закрыть IOStream
.
julia> close(f)
Если вы вновь просмотрите hello.txt
, вы увидите, что его содержимое изменилось.
Открытие файла, взаимодействие с его содержимым, а затем закрытие — очень частый шаблон работы. Для его упрощения существует другой вариант вызова open
, который принимает два аргумента: функцию и имя файла. Он открывает файл, вызывает функцию с файлом в качестве аргумента, а затем файл вновь закрывается. Например, если имеется такая функция:
function read_and_capitalize(f::IOStream)
return uppercase(read(f, String))
end
Вы можете сделать следующий вызов.
julia> open(read_and_capitalize, "hello.txt")
"HELLO AGAIN."
Будет открыт файл hello.txt
, для него будет вызвана read_and_capitalize
, а затем hello.txt
будет закрыт и будет возвращено его содержимое, написанное заглавными буквами.
Вы можете даже не определять именованную функцию, а использовать синтаксис do
, который создает анонимную функцию во время выполнения.
julia> open("hello.txt") do f
uppercase(read(f, String))
end
"HELLO AGAIN."
Простой пример использования TCP
Перейдем сразу же к простому примеру работы с TCP-сокетами. Данный функционал входит в пакет Sockets
из стандартной библиотеки. Сначала создадим простой сервер.
julia> using Sockets
julia> errormonitor(@async begin
server = listen(2000)
while true
sock = accept(server)
println("Hello World\n")
end
end)
Task (runnable) @0x00007fd31dc11ae0
Названия методов будут знакомы тем, кто работал с API сокетов Unix, хотя использование этих методов несколько проще. Первый вызов listen
создает сервер, ожидающий входящих подключений на указанном в данном случае порте (2000). Эту же функцию можно использовать для создания других различных видов серверов.
julia> listen(2000) # Прослушивает localhost:2000 (IPv4)
Sockets.TCPServer(active)
julia> listen(ip"127.0.0.1",2000) # Эквивалентен первому
Sockets.TCPServer(active)
julia> listen(ip"::1",2000) # Прослушивает localhost:2000 (IPv6)
Sockets.TCPServer(active)
julia> listen(IPv4(0),2001) # Прослушивает порт 2001 на всех интерфейсах IPv4
Sockets.TCPServer(active)
julia> listen(IPv6(0),2001) # Прослушивает порт 2001 на всех интерфейсах IPv6
Sockets.TCPServer(active)
julia> listen("testsocket") # Прослушивает сокет домена UNIX
Sockets.PipeServer(active)
julia> listen("\\.\\pipe\\testsocket") # Прослушивает именованный канал Windows
Sockets.PipeServer(active)
Учтите, что последний вызов имеет другой тип возвращаемого значения. Это связано с тем, что сервер осуществляет прослушивание не по протоколу TCP, а по именованному каналу (Windows) или сокету домена UNIX. Также примите во внимание, что именованный канал Windows имеет особый формат, в котором префикс имени (\\.\pipe\
) содержит уникальный идентификатор типа файла. Отличие TCP от именованных каналов и сокетов домена UNIX является неочевидным и связано с методами accept
и connect
. Метод accept
получает подключение к клиенту с созданного сервера, тогда как функция connect
подключается к серверу с применением указанного метода. Функция connect
принимает те же аргументы, что и listen
, поэтому если среда (т. е. узел, cwd и т. д.) такая же, то вы можете передать connect
те же аргументы, что и для прослушивания, чтобы установить подключение. Попробуем сделать это (после создания сервера выше).
julia> connect(2000)
TCPSocket(open, 0 bytes waiting)
julia> Hello World
Как и ожидалось, выводится «Hello World». Давайте проанализируем, что происходит за кулисами. При вызове connect
мы подключаемся к только что созданному серверу. Функция accept возвращает подключение к только что созданному сокету со стороны сервера и выводит «Hello World», показывая, что подключение установлено.
Важным преимуществом Julia является то, что API предоставляется в синхронном режиме, хотя ввод и вывод на самом деле происходят асинхронно, поэтому нам не нужно беспокоиться об обратных вызовах и даже проверять запуск сервера. При вызове connect
текущая задача дожидается установки подключения и лишь после этого продолжает выполнение. Во время этой приостановки возобновляется выполнение задачи сервера (поскольку теперь доступен запрос на подключение). Она принимает подключение, выводит сообщение и ожидает следующего клиента. Чтение и запись работают аналогичным образом. Рассмотрим это на примере простого эхосервера.
julia> errormonitor(@async begin
server = listen(2001)
while true
sock = accept(server)
@async while isopen(sock)
write(sock, readline(sock, keep=true))
end
end
end)
Task (runnable) @0x00007fd31dc12e60
julia> clientside = connect(2001)
TCPSocket(RawFD(28) open, 0 bytes waiting)
julia> errormonitor(@async while isopen(clientside)
write(stdout, readline(clientside, keep=true))
end)
Task (runnable) @0x00007fd31dc11870
julia> println(clientside,"Hello World from the Echo Server")
Hello World from the Echo Server
Как и для других потоков, используйте close
, чтобы отключить сокет.
julia> close(clientside)
Разрешение IP-адресов
Одним из методов connect
, который не следует за методами listen
, является connect(host::String,port)
, который пытается подключиться к узлу, задаваемому параметром host
, на порте, задаваемом параметром port
. Он позволяет делать вещи наподобие следующих.
julia> connect("google.com", 80)
TCPSocket(RawFD(30) open, 0 bytes waiting)
В основе этого функционала лежит метод getaddrinfo
, осуществляющий необходимое разрешение адресов.
julia> getaddrinfo("google.com")
ip"74.125.226.225"
Асинхронный ввод-вывод
Все операции ввода-вывода, доступные через Base.read
и Base.write
, можно выполнять асинхронно, используя сопрограммы. Вы можете создать сопрограмму для считывания или записи данных в потоке с применением макроса @async
.
julia> task = @async open("foo.txt", "w") do io
write(io, "Hello, World!")
end;
julia> wait(task)
julia> readlines("foo.txt")
1-element Array{String,1}:
"Hello, World!"
Нередко возникают ситуации, когда необходимо выполнять множество асинхронных операций одновременно и дожидаться, пока все они завершатся. Чтобы заблокировать программу до выхода из всех обернутых в ней сопрограмм, можно использовать макрос @sync
.
julia> using Sockets
julia> @sync for hostname in ("google.com", "github.com", "julialang.org")
@async begin
conn = connect(hostname, 80)
write(conn, "GET / HTTP/1.1\r\nHost:$(hostname)\r\n\r\n")
readline(conn, keep=true)
println("Finished connection to $(hostname)")
end
end
Finished connection to google.com
Finished connection to julialang.org
Finished connection to github.com
Мультивещание
В Julia поддерживается мультивещание IPv4 и IPv6 с использованием транспортного протокола UDP.
В отличие от протокола TCP UDP не делает практически никаких предположений о потребностях приложения. Протокол TCP обеспечивает управление потоком (ускорение и замедление передачи для получения максимальной пропускной способности), надежность (автоматическую повторную передачу потерянных или поврежденных пакетов), последовательность (упорядочение пакетов операционной системой перед отправкой их приложению), задание размера сегментов, а также настройку и разъединение сеанса. В протоколе UDP нет таких функций.
Протокол UDP обычно используется в приложениях с мультивещанием. TCP — это протокол с отслеживанием состояния для связи строго между двумя устройствами. В UDP можно использовать особые адреса многоадресной рассылки для одновременного взаимодействия между множеством устройств.
Получение IP-пакетов мультивещания
Для передачи данных с помощью многоадресной рассылки UDP просто используйте recv
для сокета; будет возвращен первый же полученный пакет. Однако учтите, что это не обязательно будет первый отправленный пакет!
using Sockets group = ip"228.5.6.7" socket = Sockets.UDPSocket() bind(socket, ip"0.0.0.0", 6789) join_multicast_group(socket, group) println(String(recv(socket))) leave_multicast_group(socket, group) close(socket)
Отправка IP-пакетов мультивещания
Для передачи данных с помощью мультивещания UDP просто используйте send
для сокета. Обратите внимание, что присоединять отправителя к группе многоадресной рассылки не требуется.
using Sockets group = ip"228.5.6.7" socket = Sockets.UDPSocket() send(socket, group, 6789, "Hello over IPv4") close(socket)
Пример для IPv6
В следующем примере показан тот же функционал, что и в предыдущей программе, однако в качестве протокола сетевого уровня используется IPv6.
Получатель:
using Sockets group = Sockets.IPv6("ff05::5:6:7") socket = Sockets.UDPSocket() bind(socket, Sockets.IPv6("::"), 6789) join_multicast_group(socket, group) println(String(recv(socket))) leave_multicast_group(socket, group) close(socket)
Отправитель:
using Sockets group = Sockets.IPv6("ff05::5:6:7") socket = Sockets.UDPSocket() send(socket, group, 6789, "Hello over IPv6") close(socket)