Документация Engee

Сеть и потоковая передача

Julia предоставляет интерфейс с широкими возможностями для работы с объектами, осуществляющими потоковый ввод-вывод данных, такими как терминалы, каналы и сокеты TCP. Хотя этот интерфейс является асинхронным на уровне системы, для программиста он выглядит синхронным и беспокоиться о его внутренней асинхронной работе обычно не требуется. Это реализуется за счет интенсивного использования имеющихся в Julia функций совместной многопоточности (сопрограмм).

Базовый потоковый ввод-вывод

Для всех потоков в Julia доступны как минимум методы read и write, которые принимают название потока в качестве первого аргумента, например:

julia> write(stdout, "Hello World");  # Подавляет возврат значения 11 за счет «;»
Hello World
julia> read(stdin, Char)

'\n': ASCII/Unicode U+000a (category Cc: Other, control)

Обратите внимание, что команда write возвращает 11 — число байтов (во фразе "Hello World"), выводимых в поток stdout, однако возврат этого значения подавляется с помощью ;.

После этого вновь нажимается клавиша Enter, чтобы среда Julia считала переход на новую строку. Как видно из примера, write принимает в качестве второго аргумента записываемые данные, а read — тип считываемых данных.

Например, для считывания простого байтового массива можно сделать следующее.

julia> x = zeros(UInt8, 4)
4-element Array{UInt8,1}:
 0x00
 0x00
 0x00
 0x00

julia> read!(stdin, x)
abcd
4-element Array{UInt8,1}:
 0x61
 0x62
 0x63
 0x64

Однако это довольно громоздкая реализация, поэтому есть несколько более удобных методов. Например, процедуру выше можно записать следующим образом.

julia> read(stdin, 4)
abcd
4-element Array{UInt8,1}:
 0x61
 0x62
 0x63
 0x64

Если же мы хотим считать строку целиком, нужно следующее.

julia> readline(stdin)
abcd
"abcd"

Учтите, что в зависимости от настроек терминала у вас может осуществляться буферизация строк в TTY, поэтому может потребоваться еще раз нажать Enter, чтобы отправить данные в Julia.

Для чтения каждой строки из stdin можно использовать eachline.

for line in eachline(stdin)
    print("Found $line")
end

А для чтения отдельных символов можно использовать read.

while !eof(stdin)
    x = read(stdin, Char)
    println("Found: $x")
end

Текстовый ввод-вывод

Обратите внимание, что вышеуказанный метод write работает с потоками двоичных данных. В частности, значения не преобразуются в какое-либо каноническое текстовое представление, а выводятся как есть.

julia> write(stdout, 0x61);  # Подавляет возврат значения 1 за счет «;»
a

Здесь функция write выводит a в потоке stdout, а ее возвращаемым значением является 1 (так как размер 0x61 — один байт).

Для текстового ввода-вывода используйте в зависимости от необходимости метод print или show (подробное описание разницы между ними см. в их документации).

julia> print(stdout, 0x61)
97

Дополнительные сведения о реализации методов отображения для пользовательских типов см. в разделе Настраиваемая структурная распечатка программного кода.

Контекстные свойства вывода

Иногда для вывода бывает полезно передавать методам отображения контекстные данные. В качестве платформы для связи произвольных метаданных с объектом ввода-вывода используют объект IOContext. Например, :compact => true добавляет в объект ввода-вывода параметр с подсказкой о том, что вызываемый метод отображения должен выводить сокращенные данные (если применимо). В документации по IOContext приведен список часто используемых свойств.

Работа с файлами

Как и во многих других средах, в Julia есть функция open, принимающая имя файла и возвращающая объект IOStream, который можно использовать для чтения и записи данных в файле. Допустим, у нас есть файл hello.txt, содержащий строку Hello, World!.

julia> f = open("hello.txt")
IOStream(<file hello.txt>)

julia> readlines(f)
1-element Array{String,1}:
 "Hello, World!"

Чтобы записать что-то в файл, вы можете открыть его с флагом записи ("w").

julia> f = open("hello.txt","w")
IOStream(<file hello.txt>)

julia> write(f,"Hello again.")
12

Если на этом этапе посмотреть, что содержится в hello.txt, вы увидите, что файл пуст, потому что на диск еще ничего не записывалось. Чтобы записанные данные сохранились на диске, необходимо закрыть IOStream.

julia> close(f)

Если вы вновь просмотрите hello.txt, вы увидите, что его содержимое изменилось.

Открытие файла, взаимодействие с его содержимым, а затем закрытие — очень частый шаблон работы. Для его упрощения существует другой вариант вызова open, который принимает два аргумента: функцию и имя файла. Он открывает файл, вызывает функцию с файлом в качестве аргумента, а затем файл вновь закрывается. Например, если имеется такая функция:

function read_and_capitalize(f::IOStream)
    return uppercase(read(f, String))
end

Вы можете сделать следующий вызов.

julia> open(read_and_capitalize, "hello.txt")
"HELLO AGAIN."

Будет открыт файл hello.txt, для него будет вызвана read_and_capitalize, а затем hello.txt будет закрыт и будет возвращено его содержимое, написанное заглавными буквами.

Вы можете даже не определять именованную функцию, а использовать синтаксис do, который создает анонимную функцию во время выполнения.

julia> open("hello.txt") do f
           uppercase(read(f, String))
       end
"HELLO AGAIN."

Простой пример использования TCP

Перейдем сразу же к простому примеру работы с TCP-сокетами. Данный функционал входит в пакет Sockets из стандартной библиотеки. Сначала создадим простой сервер.

julia> using Sockets

julia> errormonitor(@async begin
           server = listen(2000)
           while true
               sock = accept(server)
               println("Hello World\n")
           end
       end)
Task (runnable) @0x00007fd31dc11ae0

Названия методов будут знакомы тем, кто работал с API сокетов Unix, хотя использование этих методов несколько проще. Первый вызов listen создает сервер, ожидающий входящих подключений на указанном в данном случае порте (2000). Эту же функцию можно использовать для создания других различных видов серверов.

julia> listen(2000) # Прослушивает localhost:2000 (IPv4)
Sockets.TCPServer(active)

julia> listen(ip"127.0.0.1",2000) # Эквивалентен первому
Sockets.TCPServer(active)

julia> listen(ip"::1",2000) # Прослушивает localhost:2000 (IPv6)
Sockets.TCPServer(active)

julia> listen(IPv4(0),2001) # Прослушивает порт 2001 на всех интерфейсах IPv4
Sockets.TCPServer(active)

julia> listen(IPv6(0),2001) # Прослушивает порт 2001 на всех интерфейсах IPv6
Sockets.TCPServer(active)

julia> listen("testsocket") # Прослушивает сокет домена UNIX
Sockets.PipeServer(active)

julia> listen("\\.\\pipe\\testsocket") # Прослушивает именованный канал Windows
Sockets.PipeServer(active)

Учтите, что последний вызов имеет другой тип возвращаемого значения. Это связано с тем, что сервер осуществляет прослушивание не по протоколу TCP, а по именованному каналу (Windows) или сокету домена UNIX. Также примите во внимание, что именованный канал Windows имеет особый формат, в котором префикс имени (\\.\pipe\) содержит уникальный идентификатор типа файла. Отличие TCP от именованных каналов и сокетов домена UNIX является неочевидным и связано с методами accept и connect. Метод accept получает подключение к клиенту с созданного сервера, тогда как функция connect подключается к серверу с применением указанного метода. Функция connect принимает те же аргументы, что и listen, поэтому если среда (т. е. узел, cwd и т. д.) такая же, то вы можете передать connect те же аргументы, что и для прослушивания, чтобы установить подключение. Попробуем сделать это (после создания сервера выше).

julia> connect(2000)
TCPSocket(open, 0 bytes waiting)

julia> Hello World

Как и ожидалось, выводится «Hello World». Давайте проанализируем, что происходит за кулисами. При вызове connect мы подключаемся к только что созданному серверу. Функция accept возвращает подключение к только что созданному сокету со стороны сервера и выводит «Hello World», показывая, что подключение установлено.

Важным преимуществом Julia является то, что API предоставляется в синхронном режиме, хотя ввод и вывод на самом деле происходят асинхронно, поэтому нам не нужно беспокоиться об обратных вызовах и даже проверять запуск сервера. При вызове connect текущая задача дожидается установки подключения и лишь после этого продолжает выполнение. Во время этой приостановки возобновляется выполнение задачи сервера (поскольку теперь доступен запрос на подключение). Она принимает подключение, выводит сообщение и ожидает следующего клиента. Чтение и запись работают аналогичным образом. Рассмотрим это на примере простого эхосервера.

julia> errormonitor(@async begin
           server = listen(2001)
           while true
               sock = accept(server)
               @async while isopen(sock)
                   write(sock, readline(sock, keep=true))
               end
           end
       end)
Task (runnable) @0x00007fd31dc12e60

julia> clientside = connect(2001)
TCPSocket(RawFD(28) open, 0 bytes waiting)

julia> errormonitor(@async while isopen(clientside)
           write(stdout, readline(clientside, keep=true))
       end)
Task (runnable) @0x00007fd31dc11870

julia> println(clientside,"Hello World from the Echo Server")
Hello World from the Echo Server

Как и для других потоков, используйте close, чтобы отключить сокет.

julia> close(clientside)

Разрешение IP-адресов

Одним из методов connect, который не следует за методами listen, является connect(host::String,port), который пытается подключиться к узлу, задаваемому параметром host, на порте, задаваемом параметром port. Он позволяет делать вещи наподобие следующих.

julia> connect("google.com", 80)
TCPSocket(RawFD(30) open, 0 bytes waiting)

В основе этого функционала лежит метод getaddrinfo, осуществляющий необходимое разрешение адресов.

julia> getaddrinfo("google.com")
ip"74.125.226.225"

Асинхронный ввод-вывод

Все операции ввода-вывода, доступные через Base.read и Base.write, можно выполнять асинхронно, используя сопрограммы. Вы можете создать сопрограмму для считывания или записи данных в потоке с применением макроса @async.

julia> task = @async open("foo.txt", "w") do io
           write(io, "Hello, World!")
       end;

julia> wait(task)

julia> readlines("foo.txt")
1-element Array{String,1}:
 "Hello, World!"

Нередко возникают ситуации, когда необходимо выполнять множество асинхронных операций одновременно и дожидаться, пока все они завершатся. Чтобы заблокировать программу до выхода из всех обернутых в ней сопрограмм, можно использовать макрос @sync.

julia> using Sockets

julia> @sync for hostname in ("google.com", "github.com", "julialang.org")
           @async begin
               conn = connect(hostname, 80)
               write(conn, "GET / HTTP/1.1\r\nHost:$(hostname)\r\n\r\n")
               readline(conn, keep=true)
               println("Finished connection to $(hostname)")
           end
       end
Finished connection to google.com
Finished connection to julialang.org
Finished connection to github.com

Мультивещание

В Julia поддерживается мультивещание IPv4 и IPv6 с использованием транспортного протокола UDP.

В отличие от протокола TCP UDP не делает практически никаких предположений о потребностях приложения. Протокол TCP обеспечивает управление потоком (ускорение и замедление передачи для получения максимальной пропускной способности), надежность (автоматическую повторную передачу потерянных или поврежденных пакетов), последовательность (упорядочение пакетов операционной системой перед отправкой их приложению), задание размера сегментов, а также настройку и разъединение сеанса. В протоколе UDP нет таких функций.

Протокол UDP обычно используется в приложениях с мультивещанием. TCP — это протокол с отслеживанием состояния для связи строго между двумя устройствами. В UDP можно использовать особые адреса многоадресной рассылки для одновременного взаимодействия между множеством устройств.

Получение IP-пакетов мультивещания

Для передачи данных с помощью многоадресной рассылки UDP просто используйте recv для сокета; будет возвращен первый же полученный пакет. Однако учтите, что это не обязательно будет первый отправленный пакет!

using Sockets
group = ip"228.5.6.7"
socket = Sockets.UDPSocket()
bind(socket, ip"0.0.0.0", 6789)
join_multicast_group(socket, group)
println(String(recv(socket)))
leave_multicast_group(socket, group)
close(socket)

Отправка IP-пакетов мультивещания

Для передачи данных с помощью мультивещания UDP просто используйте send для сокета. Обратите внимание, что присоединять отправителя к группе многоадресной рассылки не требуется.

using Sockets
group = ip"228.5.6.7"
socket = Sockets.UDPSocket()
send(socket, group, 6789, "Hello over IPv4")
close(socket)

Пример для IPv6

В следующем примере показан тот же функционал, что и в предыдущей программе, однако в качестве протокола сетевого уровня используется IPv6.

Получатель:

using Sockets
group = Sockets.IPv6("ff05::5:6:7")
socket = Sockets.UDPSocket()
bind(socket, Sockets.IPv6("::"), 6789)
join_multicast_group(socket, group)
println(String(recv(socket)))
leave_multicast_group(socket, group)
close(socket)

Отправитель:

using Sockets
group = Sockets.IPv6("ff05::5:6:7")
socket = Sockets.UDPSocket()
send(socket, group, 6789, "Hello over IPv6")
close(socket)