Compare commits

...

27 Commits

Author SHA1 Message Date
1f72d4dc70 ох уж этот редми 2024-10-10 21:03:09 +04:00
b351431f51 lab4 now is ready 2024-10-10 21:02:25 +04:00
56baf52b61 lab4 ready 2024-10-10 21:00:15 +04:00
8a96320fd5 Merge pull request 'bazunov_andrew_lab_1' (#33) from bazunov_andrew_lab_1 into main
Reviewed-on: Alexey/DAS_2024_1#33
2024-09-30 22:18:50 +04:00
bd25930973 Merge pull request 'tsukanova_irina_lab_2' (#32) from tsukanova_irina_lab_2 into main
Reviewed-on: Alexey/DAS_2024_1#32
2024-09-30 22:18:28 +04:00
37996c249a Merge pull request 'dolgov_dmitriy_lab_1' (#29) from dolgov_dmitriy_lab_1 into main
Reviewed-on: Alexey/DAS_2024_1#29
2024-09-26 10:25:37 +04:00
9456d4fe01 Merge pull request 'borschevskaya_anna_lab_2 is ready' (#25) from borschevskaya_anna_lab_2 into main
Reviewed-on: Alexey/DAS_2024_1#25
2024-09-26 10:20:55 +04:00
c14e105db5 Merge pull request 'presnyakova_victoria_lab_1' (#24) from presnyakova_victoria_lab_1 into main
Reviewed-on: Alexey/DAS_2024_1#24
2024-09-26 09:59:12 +04:00
4d1e900721 Merge pull request 'yakovleva_yulia_lab_2' (#20) from yakovleva_yulia_lab_2 into main
Reviewed-on: Alexey/DAS_2024_1#20
Reviewed-by: Alexey <a.zhelepov@mail.ru>
2024-09-26 08:45:08 +04:00
7184d6d728 Обновить bazunov_andrew_lab_1/README.md 2024-09-25 15:44:25 +04:00
Bazunov Andrew Igorevich
6e7055efa4 update readme 2024-09-25 15:37:57 +04:00
Bazunov Andrew Igorevich
9e40adc53c edit docker compose 2024-09-25 15:19:28 +04:00
Bazunov Andrew Igorevich
4a36528cc7 Complete docker compose 2024-09-25 12:35:39 +04:00
ad3988e5fc добавлено видео 2024-09-25 10:58:41 +04:00
780b4b2924 add readme and fix 2024-09-25 10:51:07 +04:00
5047b16cde files 2024-09-24 16:56:39 +04:00
2b87427299 что-то есть 2024-09-24 16:55:37 +04:00
JulYakJul
21cdd4971d fix link 2024-09-24 14:53:05 +04:00
bdb5cc07ed Обновить dolgov_dmitriy_lab_1/README.md 2024-09-24 01:30:02 +04:00
e761e33201 Обновить dolgov_dmitriy_lab_1/README.md 2024-09-24 01:28:51 +04:00
Аришина)
ceee500b95 ЛР 1 готова 2024-09-24 01:20:27 +04:00
2be2c71b69 перенос 2024-09-23 20:19:10 +04:00
JulYakJul
aa8180ba49 Merge branch 'main' into yakovleva_yulia_lab_2 2024-09-23 17:34:56 +04:00
JulYakJul
ba7480cb4f fix 2024-09-23 13:10:28 +04:00
520337f92d borschevskaya_anna_lab_2 is ready 2024-09-23 08:40:17 +04:00
06d1d8cdd4 lab1 2024-09-22 18:06:51 +04:00
JulYakJul
1adaac9281 yakovleva_yulia_lab_2 is ready 2024-09-20 18:36:39 +04:00
1609 changed files with 295583 additions and 0 deletions

2
bazunov_andrew_lab_1/.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
ollama
./ollama

View File

@@ -0,0 +1,33 @@
# Распределенные вычисления и приложения Л1
## _Автор Базунов Андрей Игревич ПИбд-42_
В качестве сервисов были выбраны:
- 1.Ollama (_Сервис для использования LLMs моделей_)
- 2.Open Web Ui (_Сервис для удобного общения с моделью из сервиса Ollama_)
- 3.Gitea (_Гит сервис_)
# Docker
>Перед исполнением вполняем установку docker и проверяем версию
```sh
docker-compose --version
```
>Далее производим настройку файла docker-compose.yaml и запускаем контейнер
```sh
docker-compose up -d
```
>Для завершения работы контейнера используем команду
```sh
docker-compose down
```
---
> Замечание: после запуска контейнера, необходимо перейти в контейнер **ollamа** и выполнить установку модели [gemma2](https://ollama.com/library/gemma2:2b)
> ```sh
> docker-compose exec ollama ollama run ollama run gemma2:2b
> ```
---
Далее можно использовать веб сервис Open Web Ui по адресу **localhost:8080** для общения с моделью и Gitea по адресу **localhost:3000** - [демонстрация работы](https://vk.com/video/@viltskaa?z=video236673313_456239574%2Fpl_236673313_-2)

View File

@@ -0,0 +1,61 @@
services:
gitea: # Имя сервиса
image: gitea/gitea:latest # Имя образа
container_name: gitea # Имя контейнера, может быть произовольным
ports:
- "3000:3000" # Проброс порта Gitea на хост
volumes: # хранилище
- data:/data
environment: # переменные окружения
USER_UID: 1000
USER_GID: 1000
ollama:
image: ollama/ollama:latest
container_name: ollama
restart: always
ports:
- 7869:11434
pull_policy: always
tty: true
volumes:
- .:/code
- ./ollama/ollama:/root/.ollama # Директория для данных Ollama
environment:
- OLLAMA_KEEP_ALIVE=24h
- OLLAMA_HOST=0.0.0.0 # Указываем хост для API Ollama
networks:
- ollama-docker
command: ["serve"] # Запускаем Ollama в режиме сервера
ollama-webui:
image: ghcr.io/open-webui/open-webui:main # Образ Open Web UI
container_name: ollama-webui
restart: unless-stopped
volumes:
- ./ollama/ollama-webui:/app/backend/data
ports:
- 8080:8080 # Порт для веб-интерфейса
environment: # https://docs.openwebui.com/getting-started/env-configuration#default_models
- OLLAMA_BASE_URLS=http://host.docker.internal:7869
- ENV=dev
- WEBUI_AUTH=False
- WEBUI_NAME=Viltskaa AI
- WEBUI_URL=http://localhost:8080
- WEBUI_SECRET_KEY=t0p-s3cr3t
depends_on:
- ollama
extra_hosts:
- host.docker.internal:host-gateway
networks:
- ollama-docker
networks:
ollama-docker:
external: false
volumes:
ollama:
driver: local
data:
driver: local

38
borschevskaya_anna_lab_2/.gitignore vendored Normal file
View File

@@ -0,0 +1,38 @@
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### IntelliJ IDEA ###
.idea/modules.xml
.idea/jarRepositories.xml
.idea/compiler.xml
.idea/libraries/
*.iws
*.iml
*.ipr
### Eclipse ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/
### Mac OS ###
.DS_Store

View File

@@ -0,0 +1,43 @@
# Отчет. Лабораторная работа 2
В рамках лабораторной работы №2 были написаны два сервиса, работающих с текстовыми файлами.
Для первого сервиса был выбран вариант задания №5:
```
Ищет в каталоге /var/data файл с самым коротким названием и перекладывает его в /var/result/data.txt.
```
А для второго - №2:
```
Ищет наименьшее число из файла /var/data/data.txt и сохраняет его третью степень в /var/result/result.txt.
```
## Описание
Сначала сервис first перемещает данные из файла с самым коротким названием, находящегося в указанной примонтированной директории, в выходную папку.
Доступ к выходной папке имеет второй сервис, который выводит наименьшее число из помещенного первым сервисом файла
в третьей степени в выходной файл.
Выходной файл расположен в примонтированной директории и доступен на машине, где запускаются сервисы.
В Dockerfile используется многоэтапная сборка с использованием нескольких базовых образов на каждом этапе.
Описание значения каждой строки есть в Dockerfile в сервисе first.
В файле docker-compose.yml приведено описание новых строк, связанных с подключением примонтированных томов.
Стоит отметить, что для "общения" сервисов используется общий том common, который монтируется в контейнер по пути /var/result. Это позволяет сохранять результаты
работы первого сервиса для использования вторым сервисом.
## Как запустить
Для того, чтобы запустить сервисы, необходимо выполнить следующие действия:
1. Установить и запустить Docker Engine или Docker Desktop
2. Через консоль перейти в папку, в которой расположен файл docker-compose.yml
3. Выполнить команду:
```
docker compose up --build
```
В случае успешного запуска всех контейнеров в консоли будет выведено следующее сообщение:
```
✔ Network borschevskaya_anna_lab_2_default Created 0.1s
✔ Container borschevskaya_anna_lab_2-first-1 Created 0.1s
✔ Container borschevskaya_anna_lab_2-second-1 Created 0.1s
Attaching to borschevskaya_anna_lab_2-first-1, borschevskaya_anna_lab_2-second-1
```
Далее, в консоль каждого сервиса будут выведены сообщения о том, как прошла обработка файлов.
В случае отсутствия заданных значений переменных окружения INPUT_PATH и OUTPUT_PATH и
в иных исключительных ситуация будет выведена информация об этом.
## Видео-отчет
Работоспособность лабораторной работы можно оценить в следующем [видео](https://disk.yandex.ru/i/LFxdyRUFQDwXEQ).

View File

@@ -0,0 +1,22 @@
services:
first:
build: ./first # директория, в которой нужно искать Dockerfile для сборки первого сервиса
environment:
INPUT_PATH: /var/data/ # директория с входными данными для обработки файлов
OUTPUT_PATH: /var/result/ # директория с выходными данными обработки
volumes:
- ./volumes/input:/var/data # монтируется локальная папка с входными данными в папку внутри контейнера
- common:/var/result # монтируется общий для двух сервисов том, в который first сложит результаты обработки по варианту
second:
build: ./second # директория, в которой нужно искать Dockerfile для сборки второго сервиса
depends_on: # сервис second зависит от сервиса first и будет запущен после него
- first
environment:
INPUT_PATH: /var/result/
OUTPUT_PATH: /var/data/
volumes:
- ./volumes/output:/var/data
- common:/var/result # монтируется общий для двух сервисов том, из которого second получит результаты обработки first сервиса и выполнит свою логику
volumes:
common:

View File

@@ -0,0 +1,25 @@
# Используем образ Maven для сборки
FROM maven:3.8-eclipse-temurin-21-alpine AS build
# Устанавливаем рабочую директорию
WORKDIR /app
# Копируем только pom.xml и загружаем зависимости
# Так зависимости закэшируются в Docker при изменении кода закэшированные слои с зависимостями будут подгружаться быстрее
COPY pom.xml .
RUN mvn dependency:go-offline
# Копируем остальные исходные файлы
COPY src ./src
# Собираем весь проект
RUN mvn clean package -DskipTests
# Используем официальный образ JDK для запуска собранного jar-файла
FROM eclipse-temurin:21-jdk-alpine
# Копируем jar-файл из предыдущего этапа
COPY --from=build /app/target/*.jar /app.jar
# Указываем команду для запуска приложения
CMD ["java", "-jar", "app.jar"]

View File

@@ -0,0 +1,37 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>ru.first</groupId>
<artifactId>first</artifactId>
<version>1.0.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<build>
<plugins>
<plugin>
<!-- Build an executable JAR -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>ru.first.Main</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,50 @@
package ru.first;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Comparator;
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
import static java.util.Objects.isNull;
public class Main {
public static final String INPUT_PATH = System.getenv("INPUT_PATH");
public static final String OUTPUT_PATH = System.getenv("OUTPUT_PATH");
public static final String RESULT_FILE_NAME = "data.txt";
public static void main(String[] args) throws IOException {
if (isNull(INPUT_PATH) || INPUT_PATH.isEmpty() || isNull(OUTPUT_PATH) || OUTPUT_PATH.isEmpty()) {
System.out.printf("Отсутствуют переменные окружения INPUT_PATH = '%s' или OUTPUT_PATH = '%s'%n",
INPUT_PATH, OUTPUT_PATH);
return;
}
var inputPathDir = Path.of(INPUT_PATH);
if (!Files.exists(inputPathDir)) {
Files.createDirectory(inputPathDir);
}
var inputDirectory = new File(INPUT_PATH);
var allDirFiles = inputDirectory.listFiles();
if (isNull(allDirFiles) || allDirFiles.length == 0) {
System.out.println("Директория пуста");
return;
}
var dirFiles = Arrays.stream(allDirFiles).filter(File::isFile).toList();
if (dirFiles.isEmpty()) {
System.out.println("В указанной директории нет подходящих для обработки файлов");
return;
}
var shortestName = dirFiles.stream().min(Comparator.comparing(file -> file.getName().length())).get();
var outputPathDir = Path.of(OUTPUT_PATH);
if (!Files.exists(outputPathDir)) {
Files.createDirectory(outputPathDir);
}
var resultFilePath = Path.of(OUTPUT_PATH + File.separator + RESULT_FILE_NAME);
Files.move(Path.of(INPUT_PATH + File.separator + shortestName.getName()), resultFilePath, REPLACE_EXISTING);
}
}

View File

@@ -0,0 +1,16 @@
FROM maven:3.8-eclipse-temurin-21-alpine AS build
WORKDIR /app
COPY pom.xml .
RUN mvn dependency:go-offline
COPY src ./src
RUN mvn clean package -DskipTests
FROM eclipse-temurin:21-jdk-alpine
COPY --from=build /app/target/*.jar /app.jar
CMD ["java", "-jar", "app.jar"]

View File

@@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>ru.second</groupId>
<artifactId>second</artifactId>
<version>1.0.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>ru.second.Main</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,51 @@
package ru.second;
import java.io.File;
import java.io.FileWriter;
import java.nio.file.Files;
import static java.util.Objects.isNull;
public class Main {
public static final String INPUT_PATH = System.getenv("INPUT_PATH");
public static final String INPUT_FILE_NAME = "data.txt";
public static final String OUTPUT_PATH = System.getenv("OUTPUT_PATH");
public static final String RESULT_FILE_NAME = "result.txt";
public static void main(String[] args) {
if (isNull(INPUT_PATH) || INPUT_PATH.isEmpty() || isNull(OUTPUT_PATH) || OUTPUT_PATH.isEmpty()) {
System.out.printf("Отсутствуют переменные окружения INPUT_PATH = '%s' или OUTPUT_PATH = '%s'%n",
INPUT_PATH, OUTPUT_PATH);
return;
}
var inputFile = new File(INPUT_PATH + File.separator + INPUT_FILE_NAME);
if (!inputFile.exists()) {
System.out.println("Входной файл не существует");
return;
}
try (var stream = Files.lines(inputFile.toPath());
var writer = new FileWriter(OUTPUT_PATH + File.separator + RESULT_FILE_NAME);
) {
var min = stream.map(Main::parseInt).reduce(Integer::min);
if (min.isEmpty()) {
System.out.println("Не найдено минимальное значение среди строк файла");
return;
}
var minValue = Math.pow(min.get(), 3);
System.out.printf("Get min value = '%d'%n", min.get());
writer.append(Double.toString(minValue));
System.out.printf("To file %s was written value %f%n", RESULT_FILE_NAME, minValue);
} catch (Exception ex) {
System.out.println(ex.getMessage());
}
}
private static Integer parseInt(String line) {
line = line.replace("\\n", "");
return Integer.parseInt(line);
}
}

4
dolgov_dmitriy_lab_1/.gitignore vendored Normal file
View File

@@ -0,0 +1,4 @@
data/
log/
wordpress/
custom/

View File

@@ -0,0 +1,34 @@
# Лабораторная работа №1
## Выполнил: Долгов Дмитрий, группа ПИбд-42
### Были развёрнуты следующие сервисы:
* mediawiki (движок вики)
* wordpress (популярная система управления контентом)
* gitea (сервис для хранения репозиториев git)
* mariaDB
### Были использованы следующие технологии:
* git
* docker
* docker-compose
### Для запуска лабораторной работы необходимо ввести в консоль следующую команду:
```
docker compose up -d
```
## Результат запуска:
```
[+] Running 4/4
✔ Container dolgov_dmitriy_lab_1-wordpress-1 Running 0.0s
✔ Container dolgov_dmitriy_lab_1-database-1 Running 0.0s
✔ Container dolgov_dmitriy_lab_1-mediawiki-1 Running 0.0s
✔ Container gitea Running
```
## Видео с результатом запуска:
Видео можно посмотреть по данной [ссылке](https://drive.google.com/file/d/1hC6HhNvYBRuYVClobXyDMReA4ngwxhwc/view?usp=drive_link).

View File

@@ -0,0 +1,73 @@
# Сервисы по заданию
services:
# Сервис MediaWiki
mediawiki:
# Образ MediaWiki
image: mediawiki
# Автоматический перезапуск при сбое
restart: always
# проброс порта 80 из контейнера на порт 8080 хоста
ports:
- "8080:80"
# связь с сервисом database
links:
- database
# монтирование volume для хранения данных
volumes:
- images:/var/www/html/images
# Сервис WordPress
wordpress:
# Образ WordPress
image: wordpress:latest
# Автоматический перезапуск при сбое
ports:
- "8082:80"
restart: always
volumes:
- ./wordpress:/var/www/html
# Сервис Gitea
server:
image: gitea/gitea:latest
container_name: gitea
restart: always
environment:
- USER_UID=1000
- USER_GID=1000
volumes:
- ./data:/data
- ./custom:/app/gitea/custom
- ./log:/app/gitea/log
ports:
- "8081:3000"
links:
- database
depends_on:
- database
# Сервис MariaDB
database:
# Образ MariaDB
image: mariadb
# Автоматический перезапуск при сбое
restart: always
# переменные окружения для настройки базы данных
environment:
# имя базы данных
MYSQL_DATABASE: my_wiki
# имя пользователя
MYSQL_USER: user
# пароль пользователя
MYSQL_PASSWORD: user
# случайный пароль для пользователя root
MYSQL_RANDOM_ROOT_PASSWORD: 'yes'
# монтирование volume для хранения данных
volumes:
- db:/var/lib/mysql
# тома для хранения данных
volumes:
images:
db:

Binary file not shown.

After

Width:  |  Height:  |  Size: 275 KiB

View File

@@ -0,0 +1,29 @@
services:
rabbitmq:
image: rabbitmq:3.12.8-management
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: admin
ports:
- 15672:15672
volumes:
- rabbitmq-data:/var/lib/rabbitmq
mediawiki:
image: mediawiki
ports:
- 8081:80
volumes:
- mediawiki-data:/var/files/mediawiki
wordpress:
image: wordpress
ports:
- 8082:80
volumes:
- wordpress-data:/var/files/wordpress
volumes:
rabbitmq-data:
mediawiki-data:
wordpress-data:

View File

@@ -0,0 +1,26 @@
# Docker Compose: RabbitMQ, Mediawiki, Wordpress
## Описание проекта
Этот проект разворачивает три сервиса с помощью Docker Compose:
1. **RabbitMQ** — брокер сообщений.
2. **Mediawiki** — движок вики.
3. **Wordpress** — популярная система управления контентом.
## Команды для запуска
Из директории с файлом docker-compose.yml запустить сервисы docker-compose up --build
## Сервисы и порты
1. **RabbitMQ:**
Доступ по адресу http://localhost:15672/ (логин: admin, пароль: admin).
2. **Mediawiki:**
Доступ по адресу http://localhost:8081/.
3. **Wordpress:**
Доступ по адресу http://localhost:8082/.
## Видео https://drive.google.com/file/d/1NvsMFoMU2ecsQ17EouqB_ZaLBskonHv0/view?usp=sharing

2
tsukanova_irina_lab_2/.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
/.venv
/.idea

View File

@@ -0,0 +1,16 @@
# Цуканова Ирина ПИбд-32
# Лабораторная работа №2 - Разработка простейшего распределённого приложения
### Язык разработки приложений: Python
## Выбранные варианты
- Для программы 1: Ищет в каталоге ```/var/data``` самый большой по объёму файл и перекладывает его в ```/var/result/data.txt```.
- Для программы 2: Сохраняет произведение первого и последнего числа из файла ```/var/result/data.txt``` в ```/var/result/result.txt```.
## Описание:
Каждая программа лежит в своей папке, первая в worker-1, вторая в worker-2.
В этих же папках лежат Dockerfile'ы с инструкциями по сборке. В них присутствуют комментарии для значимых строк.
Монтированные папки ```data``` для ```/var/data``` и ```result``` для ```/var/result```.
## [Видео](https://drive.google.com/file/d/1eBbIDgTo3MF4EeM677EPEKgJEINekaC0/view?usp=drive_link)

View File

@@ -0,0 +1 @@
34 905 63 92 74 9 3 25 8 0 2 4 24 452 94 6 2 4 2 65 83 73 672 47 23 21 1

View File

@@ -0,0 +1 @@
4 9 6 320 75 348 12 75 94 63 45 23 3

View File

@@ -0,0 +1 @@
5 34 7 9 6 43 5 768 4 23 1 3 657 534 4 3 87 6 9 8 56 37 525 5 7 3 2 65 4 86 7 295 473 254 633 4 45 2

View File

@@ -0,0 +1,18 @@
services:
worker_one:
container_name: worker_one
build:
dockerfile: ./worker-1
volumes:
- ./data:/var/data
- ./result:/var/result
worker_two:
container_name: worker_two
build:
dockerfile: ./worker-2
volumes:
- ./result:/var/result
depends_on:
- worker_one

View File

@@ -0,0 +1 @@
5 34 7 9 6 43 5 768 4 23 1 3 657 534 4 3 87 6 9 8 56 37 525 5 7 3 2 65 4 86 7 295 473 254 633 4 45 2

View File

@@ -0,0 +1 @@
10

View File

@@ -0,0 +1,11 @@
# Использую базовый образ Python
FROM python:3.12-slim
# Устанавливаю рабочую директорию внутри контейнера
WORKDIR /app
# Копирую все файлы в контейнер
COPY worker_1.py .
# Команда для запуска Python-скрипта
CMD ["python", "worker_1.py"]

View File

@@ -0,0 +1,27 @@
import os
import shutil
# Ищет в каталоге /var/data самый большой по объёму файл и перекладывает его в /var/result/data.txt.
print("start worker_1")
dir_data = "/var/data"
dir_res = "/var/result"
if not os.path.exists(dir_data):
os.mkdir(dir_data)
if not os.path.exists(dir_res):
os.mkdir(dir_res)
largest_file = None
largest_size = 0
for root, dirs, files in os.walk(dir_data):
for file in files:
file_path = os.path.join(root, file)
file_size = os.path.getsize(file_path)
if file_size > largest_size:
largest_size = file_size
largest_file = file_path
if largest_file:
shutil.copyfile(largest_file, dir_res + "/data.txt")

View File

@@ -0,0 +1,11 @@
# Использую базовый образ Python
FROM python:3.12-slim
# Устанавливаю рабочую директорию внутри контейнера
WORKDIR /app
# Копирую все файлы в контейнер
COPY worker_2.py .
# Команда для запуска Python-скрипта
CMD ["python", "worker_2.py"]

View File

@@ -0,0 +1,19 @@
# Сохраняет произведение первого и последнего числа из файла /var/result/data.txt в /var/result/result.txt.
print("start worker_2")
with open('/var/result/data.txt', 'r') as f:
numbers = [int(num) for num in f.read().split() if num.isdigit()]
if numbers:
first_number = numbers[0]
last_number = numbers[-1]
result = first_number * last_number
with open('/var/result/result.txt', 'w') as f:
f.write(f"{result}\n")
print(f"Получен результат - {result}")
else:
print("Результат не получен. Файл не содержит чисел")

View File

@@ -0,0 +1,27 @@
import pika
import time
def callback(ch, method, properties, body):
print(f'Consumer 1 получил сообщение: {body.decode()}')
# Время задержки по условию
time.sleep(2)
print('Consumer 1 закончил обработку')
def consume_events_1():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# Создание очереди
channel.queue_declare(queue='consumer1_queue')
# Привязка очереди
channel.queue_bind(exchange='beauty_salon_events', queue='consumer1_queue')
channel.basic_consume(queue='consumer1_queue', on_message_callback=callback, auto_ack=True)
print('Consumer 1 начал ожидать сообщения...')
channel.start_consuming()
if __name__ == "__main__":
consume_events_1()

View File

@@ -0,0 +1,25 @@
import pika
def callback(ch, method, properties, body):
print(f'Consumer 2 получил сообщение: {body.decode()}')
# Обработка "нон-стопом"
print('Consumer 2 закончил обработку')
def consume_events_2():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# Создание очереди
channel.queue_declare(queue='consumer2_queue')
# Привязка очереди
channel.queue_bind(exchange='beauty_salon_events', queue='consumer2_queue')
channel.basic_consume(queue='consumer2_queue', on_message_callback=callback, auto_ack=True)
print('Consumer 2 начал ожидать сообщения...')
channel.start_consuming()
if __name__ == "__main__":
consume_events_2()

View File

@@ -0,0 +1,26 @@
import pika
import time
def publish_events():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# Создание exchange типа fanout
channel.exchange_declare(exchange='beauty_salon_events', exchange_type='fanout')
events = [
"Пришёл заказ на услуги",
"Сообщение от пользователя",
"Необходимо создать отчёт",
"Запись на процедуру",
"Пользователь отменил запись"
]
while True:
event = events[int(time.time()) % len(events)]
channel.basic_publish(exchange='beauty_salon_events', routing_key='', body=event)
print(f'Отправлено: {event}')
time.sleep(1)
if __name__ == "__main__":
publish_events()

View File

@@ -0,0 +1,60 @@
# Лабораторная работа №4 - Работа с брокером сообщений
## Задание
#### Цель:
Изучение проектирования приложений при помощи брокера сообщений.
#### Задачи:
* Установить брокер сообщений RabbitMQ.
* Пройти уроки 1, 2 и 3 из RabbitMQ Tutorials на любом языке программирования.
* Продемонстрировать работу брокера сообщений.
### Классы:
&nbsp;1. ```Publisher``` - класс, отвечающий за отправку сообщений
&nbsp;2. ```Consumer1``` - класс, отвечающий за принятие и обработку сообщений за задержкой 3 секунды
&nbsp;2. ```Consumer2``` - класс, отвечающий за принятие и обработку сообщений без задержек
#### Ход работы:
На компьютер был установлен брокер сообщений ```RabbitMQ```, после чего все три класса программы были одновременно запущены.
## Работа программы:
Класс ```Publisher``` успешно осуществляет отправку сообщений своим клиентам.
Класс ```Consumer1``` осуществляет принятие и обработку сообщений с задержкой в 3 секунды, это можно заметить на видео.
Класс ```Consumer2``` мгновенно осуществляет принятие и обработку сообщений.
## Работа с RabbitMQ Management UI
![](Rabbit.png "")
### Очередь ```Consumer1```
![](consumer1.png "")
### Очередь ```Consumer2```
![](consumer2.png "")
![](consumer2_2.png "")
![](consumer3-1.png "")
#### Уроки
Урок 1:
![](lesson1.png "")
Урок 2:
![](lesson2.png "")
Урок 3:
![](lesson3.png "")
# ВК
https://vk.com/video256017065_456239872

Binary file not shown.

After

Width:  |  Height:  |  Size: 65 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 20 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 18 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 17 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 16 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 66 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 57 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 104 KiB

View File

@@ -0,0 +1,25 @@
import pika, sys, os
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(f" [x] Received {body}")
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)

View File

@@ -0,0 +1,11 @@
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

View File

@@ -0,0 +1,19 @@
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=pika.DeliveryMode.Persistent
))
print(f" [x] Sent {message}")
connection.close()

View File

@@ -0,0 +1,23 @@
#!/usr/bin/env python
import pika
import time
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] Received {body.decode()}")
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()

View File

@@ -0,0 +1,13 @@
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(f" [x] Sent {message}")
connection.close()

View File

@@ -0,0 +1,22 @@
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] {body}")
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()

View File

@@ -0,0 +1,222 @@
# don't import any costly modules
import sys
import os
is_pypy = '__pypy__' in sys.builtin_module_names
def warn_distutils_present():
if 'distutils' not in sys.modules:
return
if is_pypy and sys.version_info < (3, 7):
# PyPy for 3.6 unconditionally imports distutils, so bypass the warning
# https://foss.heptapod.net/pypy/pypy/-/blob/be829135bc0d758997b3566062999ee8b23872b4/lib-python/3/site.py#L250
return
import warnings
warnings.warn(
"Distutils was imported before Setuptools, but importing Setuptools "
"also replaces the `distutils` module in `sys.modules`. This may lead "
"to undesirable behaviors or errors. To avoid these issues, avoid "
"using distutils directly, ensure that setuptools is installed in the "
"traditional way (e.g. not an editable install), and/or make sure "
"that setuptools is always imported before distutils."
)
def clear_distutils():
if 'distutils' not in sys.modules:
return
import warnings
warnings.warn("Setuptools is replacing distutils.")
mods = [
name
for name in sys.modules
if name == "distutils" or name.startswith("distutils.")
]
for name in mods:
del sys.modules[name]
def enabled():
"""
Allow selection of distutils by environment variable.
"""
which = os.environ.get('SETUPTOOLS_USE_DISTUTILS', 'local')
return which == 'local'
def ensure_local_distutils():
import importlib
clear_distutils()
# With the DistutilsMetaFinder in place,
# perform an import to cause distutils to be
# loaded from setuptools._distutils. Ref #2906.
with shim():
importlib.import_module('distutils')
# check that submodules load as expected
core = importlib.import_module('distutils.core')
assert '_distutils' in core.__file__, core.__file__
assert 'setuptools._distutils.log' not in sys.modules
def do_override():
"""
Ensure that the local copy of distutils is preferred over stdlib.
See https://github.com/pypa/setuptools/issues/417#issuecomment-392298401
for more motivation.
"""
if enabled():
warn_distutils_present()
ensure_local_distutils()
class _TrivialRe:
def __init__(self, *patterns):
self._patterns = patterns
def match(self, string):
return all(pat in string for pat in self._patterns)
class DistutilsMetaFinder:
def find_spec(self, fullname, path, target=None):
# optimization: only consider top level modules and those
# found in the CPython test suite.
if path is not None and not fullname.startswith('test.'):
return
method_name = 'spec_for_{fullname}'.format(**locals())
method = getattr(self, method_name, lambda: None)
return method()
def spec_for_distutils(self):
if self.is_cpython():
return
import importlib
import importlib.abc
import importlib.util
try:
mod = importlib.import_module('setuptools._distutils')
except Exception:
# There are a couple of cases where setuptools._distutils
# may not be present:
# - An older Setuptools without a local distutils is
# taking precedence. Ref #2957.
# - Path manipulation during sitecustomize removes
# setuptools from the path but only after the hook
# has been loaded. Ref #2980.
# In either case, fall back to stdlib behavior.
return
class DistutilsLoader(importlib.abc.Loader):
def create_module(self, spec):
mod.__name__ = 'distutils'
return mod
def exec_module(self, module):
pass
return importlib.util.spec_from_loader(
'distutils', DistutilsLoader(), origin=mod.__file__
)
@staticmethod
def is_cpython():
"""
Suppress supplying distutils for CPython (build and tests).
Ref #2965 and #3007.
"""
return os.path.isfile('pybuilddir.txt')
def spec_for_pip(self):
"""
Ensure stdlib distutils when running under pip.
See pypa/pip#8761 for rationale.
"""
if self.pip_imported_during_build():
return
clear_distutils()
self.spec_for_distutils = lambda: None
@classmethod
def pip_imported_during_build(cls):
"""
Detect if pip is being imported in a build script. Ref #2355.
"""
import traceback
return any(
cls.frame_file_is_setup(frame) for frame, line in traceback.walk_stack(None)
)
@staticmethod
def frame_file_is_setup(frame):
"""
Return True if the indicated frame suggests a setup.py file.
"""
# some frames may not have __file__ (#2940)
return frame.f_globals.get('__file__', '').endswith('setup.py')
def spec_for_sensitive_tests(self):
"""
Ensure stdlib distutils when running select tests under CPython.
python/cpython#91169
"""
clear_distutils()
self.spec_for_distutils = lambda: None
sensitive_tests = (
[
'test.test_distutils',
'test.test_peg_generator',
'test.test_importlib',
]
if sys.version_info < (3, 10)
else [
'test.test_distutils',
]
)
for name in DistutilsMetaFinder.sensitive_tests:
setattr(
DistutilsMetaFinder,
f'spec_for_{name}',
DistutilsMetaFinder.spec_for_sensitive_tests,
)
DISTUTILS_FINDER = DistutilsMetaFinder()
def add_shim():
DISTUTILS_FINDER in sys.meta_path or insert_shim()
class shim:
def __enter__(self):
insert_shim()
def __exit__(self, exc, value, tb):
remove_shim()
def insert_shim():
sys.meta_path.insert(0, DISTUTILS_FINDER)
def remove_shim():
try:
sys.meta_path.remove(DISTUTILS_FINDER)
except ValueError:
pass

View File

@@ -0,0 +1 @@
__import__('_distutils_hack').do_override()

View File

@@ -0,0 +1 @@
import os; var = 'SETUPTOOLS_USE_DISTUTILS'; enabled = os.environ.get(var, 'local') == 'local'; enabled and __import__('_distutils_hack').add_shim();

View File

@@ -0,0 +1,25 @@
Copyright (c) 2009-2021, Tony Garnock-Jones, Gavin M. Roy, Pivotal Software, Inc and others.
All rights reserved.
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name of the Pika project nor the names of its contributors may be used
to endorse or promote products derived from this software without specific
prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@@ -0,0 +1,320 @@
Metadata-Version: 2.1
Name: pika
Version: 1.3.2
Summary: Pika Python AMQP Client Library
Maintainer-email: "Gavin M. Roy" <gavinmroy@gmail.com>, Luke Bakken <lukerbakken@gmail.com>
License: BSD-3-Clause
Project-URL: Homepage, https://pika.readthedocs.io
Project-URL: Source, https://github.com/pika/pika
Classifier: Development Status :: 5 - Production/Stable
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: BSD License
Classifier: Natural Language :: English
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: Implementation :: CPython
Classifier: Programming Language :: Python :: Implementation :: Jython
Classifier: Programming Language :: Python :: Implementation :: PyPy
Classifier: Topic :: Communications
Classifier: Topic :: Internet
Classifier: Topic :: Software Development :: Libraries
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: System :: Networking
Requires-Python: >=3.7
Description-Content-Type: text/x-rst
License-File: LICENSE
Provides-Extra: gevent
Requires-Dist: gevent ; extra == 'gevent'
Provides-Extra: tornado
Requires-Dist: tornado ; extra == 'tornado'
Provides-Extra: twisted
Requires-Dist: twisted ; extra == 'twisted'
Pika
====
Pika is a RabbitMQ (AMQP 0-9-1) client library for Python.
|Version| |Python versions| |Actions Status| |Coverage| |License| |Docs|
Introduction
------------
Pika is a pure-Python implementation of the AMQP 0-9-1 protocol including
RabbitMQ's extensions.
- Supports Python 3.7+ (`1.1.0` was the last version to support 2.7)
- Since threads aren't appropriate to every situation, it doesn't require
threads. Pika core takes care not to forbid them, either. The same goes for
greenlets, callbacks, continuations, and generators. An instance of Pika's
built-in connection adapters isn't thread-safe, however.
- People may be using direct sockets, plain old ``select()``, or any of the
wide variety of ways of getting network events to and from a Python
application. Pika tries to stay compatible with all of these, and to make
adapting it to a new environment as simple as possible.
Documentation
-------------
Pika's documentation can be found at https://pika.readthedocs.io.
Example
-------
Here is the most simple example of use, sending a message with the
``pika.BlockingConnection`` adapter:
.. code :: python
import pika
connection = pika.BlockingConnection()
channel = connection.channel()
channel.basic_publish(exchange='test', routing_key='test',
body=b'Test message.')
connection.close()
And an example of writing a blocking consumer:
.. code :: python
import pika
connection = pika.BlockingConnection()
channel = connection.channel()
for method_frame, properties, body in channel.consume('test'):
# Display the message parts and acknowledge the message
print(method_frame, properties, body)
channel.basic_ack(method_frame.delivery_tag)
# Escape out of the loop after 10 messages
if method_frame.delivery_tag == 10:
break
# Cancel the consumer and return any pending messages
requeued_messages = channel.cancel()
print('Requeued %i messages' % requeued_messages)
connection.close()
Pika provides the following adapters
------------------------------------
- ``pika.adapters.asyncio_connection.AsyncioConnection`` - asynchronous adapter
for Python 3 `AsyncIO <https://docs.python.org/3/library/asyncio.html>`_'s
I/O loop.
- ``pika.BlockingConnection`` - synchronous adapter on top of library for
simple usage.
- ``pika.SelectConnection`` - asynchronous adapter without third-party
dependencies.
- ``pika.adapters.gevent_connection.GeventConnection`` - asynchronous adapter
for use with `Gevent <http://www.gevent.org>`_'s I/O loop.
- ``pika.adapters.tornado_connection.TornadoConnection`` - asynchronous adapter
for use with `Tornado <http://tornadoweb.org>`_'s I/O loop.
- ``pika.adapters.twisted_connection.TwistedProtocolConnection`` - asynchronous
adapter for use with `Twisted <http://twistedmatrix.com>`_'s I/O loop.
Multiple connection parameters
------------------------------
You can also pass multiple ``pika.ConnectionParameters`` instances for
fault-tolerance as in the code snippet below (host names are just examples, of
course). To enable retries, set ``connection_attempts`` and ``retry_delay`` as
needed in the last ``pika.ConnectionParameters`` element of the sequence.
Retries occur after connection attempts using all of the given connection
parameters fail.
.. code :: python
import pika
parameters = (
pika.ConnectionParameters(host='rabbitmq.zone1.yourdomain.com'),
pika.ConnectionParameters(host='rabbitmq.zone2.yourdomain.com',
connection_attempts=5, retry_delay=1))
connection = pika.BlockingConnection(parameters)
With non-blocking adapters, such as ``pika.SelectConnection`` and
``pika.adapters.asyncio_connection.AsyncioConnection``, you can request a
connection using multiple connection parameter instances via the connection
adapter's ``create_connection()`` class method.
Requesting message acknowledgements from another thread
-------------------------------------------------------
The single-threaded usage constraint of an individual Pika connection adapter
instance may result in a dropped AMQP/stream connection due to AMQP heartbeat
timeout in consumers that take a long time to process an incoming message. A
common solution is to delegate processing of the incoming messages to another
thread, while the connection adapter's thread continues to service its I/O
loop's message pump, permitting AMQP heartbeats and other I/O to be serviced in
a timely fashion.
Messages processed in another thread may not be acknowledged directly from that
thread, since all accesses to the connection adapter instance must be from a
single thread, which is the thread running the adapter's I/O loop. This is
accomplished by requesting a callback to be executed in the adapter's
I/O loop thread. For example, the callback function's implementation might look
like this:
.. code :: python
def ack_message(channel, delivery_tag):
"""Note that `channel` must be the same Pika channel instance via which
the message being acknowledged was retrieved (AMQP protocol constraint).
"""
if channel.is_open:
channel.basic_ack(delivery_tag)
else:
# Channel is already closed, so we can't acknowledge this message;
# log and/or do something that makes sense for your app in this case.
pass
The code running in the other thread may request the ``ack_message()`` function
to be executed in the connection adapter's I/O loop thread using an
adapter-specific mechanism:
- ``pika.BlockingConnection`` abstracts its I/O loop from the application and
thus exposes ``pika.BlockingConnection.add_callback_threadsafe()``. Refer to
this method's docstring for additional information. For example:
.. code :: python
connection.add_callback_threadsafe(functools.partial(ack_message, channel, delivery_tag))
- When using a non-blocking connection adapter, such as
``pika.adapters.asyncio_connection.AsyncioConnection`` or
``pika.SelectConnection``, you use the underlying asynchronous framework's
native API for requesting an I/O loop-bound callback from another thread. For
example, ``pika.SelectConnection``'s I/O loop provides
``add_callback_threadsafe()``,
``pika.adapters.tornado_connection.TornadoConnection``'s I/O loop has
``add_callback()``, while
``pika.adapters.asyncio_connection.AsyncioConnection``'s I/O loop exposes
``call_soon_threadsafe()``.
This threadsafe callback request mechanism may also be used to delegate
publishing of messages, etc., from a background thread to the connection
adapter's thread.
Connection recovery
-------------------
Some RabbitMQ clients (Bunny, Java, .NET, Objective-C, Swift) provide a way to
automatically recover a connection, its channels and topology (e.g. queues,
bindings and consumers) after a network failure. Others require connection
recovery to be performed by the application code and strive to make it a
straightforward process. Pika falls into the second category.
Pika supports multiple connection adapters. They take different approaches to
connection recovery.
For ``pika.BlockingConnection`` adapter exception handling can be used to check
for connection errors. Here is a very basic example:
.. code :: python
import pika
while True:
try:
connection = pika.BlockingConnection()
channel = connection.channel()
channel.basic_consume('test', on_message_callback)
channel.start_consuming()
# Don't recover if connection was closed by broker
except pika.exceptions.ConnectionClosedByBroker:
break
# Don't recover on channel errors
except pika.exceptions.AMQPChannelError:
break
# Recover on all other connection errors
except pika.exceptions.AMQPConnectionError:
continue
This example can be found in `examples/consume_recover.py`.
Generic operation retry libraries such as
`retry <https://github.com/invl/retry>`_ can be used. Decorators make it
possible to configure some additional recovery behaviours, like delays between
retries and limiting the number of retries:
.. code :: python
from retry import retry
@retry(pika.exceptions.AMQPConnectionError, delay=5, jitter=(1, 3))
def consume():
connection = pika.BlockingConnection()
channel = connection.channel()
channel.basic_consume('test', on_message_callback)
try:
channel.start_consuming()
# Don't recover connections closed by server
except pika.exceptions.ConnectionClosedByBroker:
pass
consume()
This example can be found in `examples/consume_recover_retry.py`.
For asynchronous adapters, use ``on_close_callback`` to react to connection
failure events. This callback can be used to clean up and recover the
connection.
An example of recovery using ``on_close_callback`` can be found in
`examples/asynchronous_consumer_example.py`.
Contributing
------------
To contribute to Pika, please make sure that any new features or changes to
existing functionality **include test coverage**.
*Pull requests that add or change code without adequate test coverage will be
rejected.*
Additionally, please format your code using
`Yapf <http://pypi.python.org/pypi/yapf>`_ with ``google`` style prior to
issuing your pull request. *Note: only format those lines that you have changed
in your pull request. If you format an entire file and change code outside of
the scope of your PR, it will likely be rejected.*
Extending to support additional I/O frameworks
----------------------------------------------
New non-blocking adapters may be implemented in either of the following ways:
- By subclassing ``pika.BaseConnection``, implementing its abstract method and
passing its constructor an implementation of
``pika.adapters.utils.nbio_interface.AbstractIOServices``.
``pika.BaseConnection`` implements ``pika.connection.Connection``'s abstract
methods, including internally-initiated connection logic. For examples, refer
to the implementations of
``pika.adapters.asyncio_connection.AsyncioConnection``,
``pika.adapters.gevent_connection.GeventConnection`` and
``pika.adapters.tornado_connection.TornadoConnection``.
- By subclassing ``pika.connection.Connection`` and implementing its abstract
methods. This approach facilitates implementation of custom
connection-establishment and transport mechanisms. For an example, refer to
the implementation of
``pika.adapters.twisted_connection.TwistedProtocolConnection``.
.. |Version| image:: https://img.shields.io/pypi/v/pika.svg?
:target: http://badge.fury.io/py/pika
.. |Python versions| image:: https://img.shields.io/pypi/pyversions/pika.svg
:target: https://pypi.python.org/pypi/pika
.. |Actions Status| image:: https://github.com/pika/pika/actions/workflows/main.yaml/badge.svg
:target: https://github.com/pika/pika/actions/workflows/main.yaml
.. |Coverage| image:: https://img.shields.io/codecov/c/github/pika/pika.svg?
:target: https://codecov.io/github/pika/pika?branch=main
.. |License| image:: https://img.shields.io/pypi/l/pika.svg?
:target: https://pika.readthedocs.io
.. |Docs| image:: https://readthedocs.org/projects/pika/badge/?version=stable
:target: https://pika.readthedocs.io
:alt: Documentation Status

View File

@@ -0,0 +1,68 @@
pika-1.3.2.dist-info/INSTALLER,sha256=zuuue4knoyJ-UwPPXg8fezS7VCrXJQrAP7zeNuwvFQg,4
pika-1.3.2.dist-info/LICENSE,sha256=21HeRCNF3Ho285qYfFcVgx8Kfy0lg65jKQFNDDIvYrk,1552
pika-1.3.2.dist-info/METADATA,sha256=VHmz3pPAkWR45GIHPzGR3dswemh0G--XHdeB_xsqXZ4,13052
pika-1.3.2.dist-info/RECORD,,
pika-1.3.2.dist-info/REQUESTED,sha256=47DEQpj8HBSa-_TImW-5JCeuQeRkm5NMpJWZG3hSuFU,0
pika-1.3.2.dist-info/WHEEL,sha256=pkctZYzUS4AYVn6dJ-7367OJZivF2e8RA9b_ZBjif18,92
pika-1.3.2.dist-info/top_level.txt,sha256=ATSgSqw16wMD-Dht8AJlcJobg1xKjOwpbPEvKZJqfzg,5
pika-1.3.2.dist-info/zip-safe,sha256=AbpHGcgLb-kRsJGnwFEktk7uzpZOCcBY74-YBdrKVGs,1
pika/__init__.py,sha256=fGTPYlzKxqsnr6aYkMTkp1GVLl71BXcXtCY7QLSyTvI,693
pika/__pycache__/__init__.cpython-311.pyc,,
pika/__pycache__/amqp_object.cpython-311.pyc,,
pika/__pycache__/callback.cpython-311.pyc,,
pika/__pycache__/channel.cpython-311.pyc,,
pika/__pycache__/compat.cpython-311.pyc,,
pika/__pycache__/connection.cpython-311.pyc,,
pika/__pycache__/credentials.cpython-311.pyc,,
pika/__pycache__/data.cpython-311.pyc,,
pika/__pycache__/delivery_mode.cpython-311.pyc,,
pika/__pycache__/diagnostic_utils.cpython-311.pyc,,
pika/__pycache__/exceptions.cpython-311.pyc,,
pika/__pycache__/exchange_type.cpython-311.pyc,,
pika/__pycache__/frame.cpython-311.pyc,,
pika/__pycache__/heartbeat.cpython-311.pyc,,
pika/__pycache__/spec.cpython-311.pyc,,
pika/__pycache__/tcp_socket_opts.cpython-311.pyc,,
pika/__pycache__/validators.cpython-311.pyc,,
pika/adapters/__init__.py,sha256=GTUPWmxBgTEuZp0zrOqX9GX5gIwZfv3nTWi1uXwdBUU,994
pika/adapters/__pycache__/__init__.cpython-311.pyc,,
pika/adapters/__pycache__/asyncio_connection.cpython-311.pyc,,
pika/adapters/__pycache__/base_connection.cpython-311.pyc,,
pika/adapters/__pycache__/blocking_connection.cpython-311.pyc,,
pika/adapters/__pycache__/gevent_connection.cpython-311.pyc,,
pika/adapters/__pycache__/select_connection.cpython-311.pyc,,
pika/adapters/__pycache__/tornado_connection.cpython-311.pyc,,
pika/adapters/__pycache__/twisted_connection.cpython-311.pyc,,
pika/adapters/asyncio_connection.py,sha256=z9SmKmFCft_PPq-mD_AS_PsOijMHwH2mAnMVsTEmkVY,9235
pika/adapters/base_connection.py,sha256=5QpA2q1qlV8sHSMHITVb6hDCbn3ErXQOc23MQi66oOQ,20525
pika/adapters/blocking_connection.py,sha256=gtXhod6U649z5WvnpWfdUoi9qZtkbknT1DioYt6Gkmw,110248
pika/adapters/gevent_connection.py,sha256=RM-Ol6PcFyrkBqFx7tOVOhXQGZKqHGgOSuLPO7fgqdA,16735
pika/adapters/select_connection.py,sha256=Jp4JfqhptZ955apFhymJzM2oK7P8O9D7hKf0fTZ9xC0,45162
pika/adapters/tornado_connection.py,sha256=q5QwUBkC_pSq8kr9R3TorInpXINM9sbFHYJsj2-4rjM,3559
pika/adapters/twisted_connection.py,sha256=hACt_5Lxs5F0nsbe5qG-h9lrKEPfL_VGnjUhonet4Is,50294
pika/adapters/utils/__init__.py,sha256=47DEQpj8HBSa-_TImW-5JCeuQeRkm5NMpJWZG3hSuFU,0
pika/adapters/utils/__pycache__/__init__.cpython-311.pyc,,
pika/adapters/utils/__pycache__/connection_workflow.cpython-311.pyc,,
pika/adapters/utils/__pycache__/io_services_utils.cpython-311.pyc,,
pika/adapters/utils/__pycache__/nbio_interface.cpython-311.pyc,,
pika/adapters/utils/__pycache__/selector_ioloop_adapter.cpython-311.pyc,,
pika/adapters/utils/connection_workflow.py,sha256=8qDbckjKXHv9WmymheKr6pm1_KQm9cKSf11ipSW_ydE,33845
pika/adapters/utils/io_services_utils.py,sha256=fCYJFW7t3cN3aYW9GuUBnMHf0Aq-UlytnOulufer2sg,53574
pika/adapters/utils/nbio_interface.py,sha256=fiz6CMh5FvrV0_cjO1DsrpA4YINRYhWIdVp67SZLh3g,17217
pika/adapters/utils/selector_ioloop_adapter.py,sha256=F55JelsVb3agQ7z-1V8I3fo1JxTxhXy-0J53KshqYVI,19955
pika/amqp_object.py,sha256=-7u2LCPy-DQODB30Jt7WfxsINuMXmSu5BjqITCzWMbk,1830
pika/callback.py,sha256=l_9kQBW5Ta4f1tPuVUaddjfFqwu1uPyoBPDXSP4-KLw,14743
pika/channel.py,sha256=ynFkBA7JY5Axvi5uCGhhKmfU4IjbngO0SRhoBF6NMAw,63112
pika/compat.py,sha256=f_zeLXYgzwj8zFh6ZjUa7b-vEpm7c7hK_4bMYrfH28E,7221
pika/connection.py,sha256=14L1q-u1Jc1t_HG062E5TWRSdHgTui_tmrOAzcoFhGw,87627
pika/credentials.py,sha256=1EIMKuFrWLqCWitRFiqLRCx25HTYid-tnQGH-4gJ4Yg,4646
pika/data.py,sha256=SxYCuololigT9w3tLvZY3AJtBcSb0UJoxXoQlvZ7Ong,9956
pika/delivery_mode.py,sha256=nMjT1H7REUFhxPtvqsovQy03AemnhKLcmlq3UQ0NeSI,87
pika/diagnostic_utils.py,sha256=nxYnlMJejjoDtfUx_3QBgy1y-jwy1bghEYfejphakuY,1759
pika/exceptions.py,sha256=CjT1lYgvUe79zGOAmvTGsylTUKGz_YCdnzL7FDYyhqw,10117
pika/exchange_type.py,sha256=afA-c21QmVNuuUHKoHYYvrIg6oUOXThd4lpPsDz1ilE,390
pika/frame.py,sha256=0IhYLWvEeRvHhB4K-z3jF6Dv_FoxskzNuAXhq9LFpuo,7744
pika/heartbeat.py,sha256=hoORwZHRZsi3Im6o7y05H86dTiO9-XAK4s28wsjjiK8,8170
pika/spec.py,sha256=2pQLO4hFOu0QNRA1ry6kGpqJ6yQRAZviMoOrmrb4CYQ,80448
pika/tcp_socket_opts.py,sha256=QPU762BQKqY94RohuG57fz0EZ5CyabezcJxySouckAs,1513
pika/validators.py,sha256=iLw8dwbnNaEK3yd_TU-BplIpQ4jx4SMYSmTR-HikNc0,1419

View File

@@ -0,0 +1,5 @@
Wheel-Version: 1.0
Generator: bdist_wheel (0.40.0)
Root-Is-Purelib: true
Tag: py3-none-any

View File

@@ -0,0 +1,22 @@
__version__ = '1.3.2'
import logging
# Add NullHandler before importing Pika modules to prevent logging warnings
logging.getLogger(__name__).addHandler(logging.NullHandler())
# pylint: disable=C0413
from pika.connection import ConnectionParameters
from pika.connection import URLParameters
from pika.connection import SSLOptions
from pika.credentials import PlainCredentials
from pika.spec import BasicProperties
from pika.delivery_mode import DeliveryMode
from pika import adapters
from pika.adapters import BaseConnection
from pika.adapters import BlockingConnection
from pika.adapters import SelectConnection
from pika.adapters.utils.connection_workflow import AMQPConnectionWorkflow

View File

@@ -0,0 +1,23 @@
"""
Connection Adapters
===================
Pika provides multiple adapters to connect to RabbitMQ:
- adapters.asyncio_connection.AsyncioConnection: Native Python3 AsyncIO use
- adapters.blocking_connection.BlockingConnection: Enables blocking,
synchronous operation on top of library for simple uses.
- adapters.gevent_connection.GeventConnection: Connection adapter for use with
Gevent.
- adapters.select_connection.SelectConnection: A native event based connection
adapter that implements select, kqueue, poll and epoll.
- adapters.tornado_connection.TornadoConnection: Connection adapter for use
with the Tornado web framework.
- adapters.twisted_connection.TwistedProtocolConnection: Connection adapter for
use with the Twisted framework
"""
from pika.adapters.base_connection import BaseConnection
from pika.adapters.blocking_connection import BlockingConnection
from pika.adapters.select_connection import SelectConnection
from pika.adapters.select_connection import IOLoop

View File

@@ -0,0 +1,283 @@
"""Use pika with the Asyncio EventLoop"""
import asyncio
import logging
import sys
from pika.adapters import base_connection
from pika.adapters.utils import nbio_interface, io_services_utils
LOGGER = logging.getLogger(__name__)
if sys.platform == 'win32':
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
class AsyncioConnection(base_connection.BaseConnection):
""" The AsyncioConnection runs on the Asyncio EventLoop.
"""
def __init__(self,
parameters=None,
on_open_callback=None,
on_open_error_callback=None,
on_close_callback=None,
custom_ioloop=None,
internal_connection_workflow=True):
""" Create a new instance of the AsyncioConnection class, connecting
to RabbitMQ automatically
:param pika.connection.Parameters parameters: Connection parameters
:param callable on_open_callback: The method to call when the connection
is open
:param None | method on_open_error_callback: Called if the connection
can't be established or connection establishment is interrupted by
`Connection.close()`: on_open_error_callback(Connection, exception).
:param None | method on_close_callback: Called when a previously fully
open connection is closed:
`on_close_callback(Connection, exception)`, where `exception` is
either an instance of `exceptions.ConnectionClosed` if closed by
user or broker or exception of another type that describes the cause
of connection failure.
:param None | asyncio.AbstractEventLoop |
nbio_interface.AbstractIOServices custom_ioloop:
Defaults to asyncio.get_event_loop().
:param bool internal_connection_workflow: True for autonomous connection
establishment which is default; False for externally-managed
connection workflow via the `create_connection()` factory.
"""
if isinstance(custom_ioloop, nbio_interface.AbstractIOServices):
nbio = custom_ioloop
else:
nbio = _AsyncioIOServicesAdapter(custom_ioloop)
super().__init__(
parameters,
on_open_callback,
on_open_error_callback,
on_close_callback,
nbio,
internal_connection_workflow=internal_connection_workflow)
@classmethod
def create_connection(cls,
connection_configs,
on_done,
custom_ioloop=None,
workflow=None):
"""Implement
:py:classmethod::`pika.adapters.BaseConnection.create_connection()`.
"""
nbio = _AsyncioIOServicesAdapter(custom_ioloop)
def connection_factory(params):
"""Connection factory."""
if params is None:
raise ValueError('Expected pika.connection.Parameters '
'instance, but got None in params arg.')
return cls(
parameters=params,
custom_ioloop=nbio,
internal_connection_workflow=False)
return cls._start_connection_workflow(
connection_configs=connection_configs,
connection_factory=connection_factory,
nbio=nbio,
workflow=workflow,
on_done=on_done)
class _AsyncioIOServicesAdapter(io_services_utils.SocketConnectionMixin,
io_services_utils.StreamingConnectionMixin,
nbio_interface.AbstractIOServices,
nbio_interface.AbstractFileDescriptorServices):
"""Implements
:py:class:`.utils.nbio_interface.AbstractIOServices` interface
on top of `asyncio`.
NOTE:
:py:class:`.utils.nbio_interface.AbstractFileDescriptorServices`
interface is only required by the mixins.
"""
def __init__(self, loop=None):
"""
:param asyncio.AbstractEventLoop | None loop: If None, gets default
event loop from asyncio.
"""
self._loop = loop or asyncio.get_event_loop()
def get_native_ioloop(self):
"""Implement
:py:meth:`.utils.nbio_interface.AbstractIOServices.get_native_ioloop()`.
"""
return self._loop
def close(self):
"""Implement
:py:meth:`.utils.nbio_interface.AbstractIOServices.close()`.
"""
self._loop.close()
def run(self):
"""Implement :py:meth:`.utils.nbio_interface.AbstractIOServices.run()`.
"""
self._loop.run_forever()
def stop(self):
"""Implement :py:meth:`.utils.nbio_interface.AbstractIOServices.stop()`.
"""
self._loop.stop()
def add_callback_threadsafe(self, callback):
"""Implement
:py:meth:`.utils.nbio_interface.AbstractIOServices.add_callback_threadsafe()`.
"""
self._loop.call_soon_threadsafe(callback)
def call_later(self, delay, callback):
"""Implement
:py:meth:`.utils.nbio_interface.AbstractIOServices.call_later()`.
"""
return _TimerHandle(self._loop.call_later(delay, callback))
def getaddrinfo(self,
host,
port,
on_done,
family=0,
socktype=0,
proto=0,
flags=0):
"""Implement
:py:meth:`.utils.nbio_interface.AbstractIOServices.getaddrinfo()`.
"""
return self._schedule_and_wrap_in_io_ref(
self._loop.getaddrinfo(
host,
port,
family=family,
type=socktype,
proto=proto,
flags=flags), on_done)
def set_reader(self, fd, on_readable):
"""Implement
:py:meth:`.utils.nbio_interface.AbstractFileDescriptorServices.set_reader()`.
"""
self._loop.add_reader(fd, on_readable)
LOGGER.debug('set_reader(%s, _)', fd)
def remove_reader(self, fd):
"""Implement
:py:meth:`.utils.nbio_interface.AbstractFileDescriptorServices.remove_reader()`.
"""
LOGGER.debug('remove_reader(%s)', fd)
return self._loop.remove_reader(fd)
def set_writer(self, fd, on_writable):
"""Implement
:py:meth:`.utils.nbio_interface.AbstractFileDescriptorServices.set_writer()`.
"""
self._loop.add_writer(fd, on_writable)
LOGGER.debug('set_writer(%s, _)', fd)
def remove_writer(self, fd):
"""Implement
:py:meth:`.utils.nbio_interface.AbstractFileDescriptorServices.remove_writer()`.
"""
LOGGER.debug('remove_writer(%s)', fd)
return self._loop.remove_writer(fd)
def _schedule_and_wrap_in_io_ref(self, coro, on_done):
"""Schedule the coroutine to run and return _AsyncioIOReference
:param coroutine-obj coro:
:param callable on_done: user callback that takes the completion result
or exception as its only arg. It will not be called if the operation
was cancelled.
:rtype: _AsyncioIOReference which is derived from
nbio_interface.AbstractIOReference
"""
if not callable(on_done):
raise TypeError(
'on_done arg must be callable, but got {!r}'.format(on_done))
return _AsyncioIOReference(
asyncio.ensure_future(coro, loop=self._loop), on_done)
class _TimerHandle(nbio_interface.AbstractTimerReference):
"""This module's adaptation of `nbio_interface.AbstractTimerReference`.
"""
def __init__(self, handle):
"""
:param asyncio.Handle handle:
"""
self._handle = handle
def cancel(self):
if self._handle is not None:
self._handle.cancel()
self._handle = None
class _AsyncioIOReference(nbio_interface.AbstractIOReference):
"""This module's adaptation of `nbio_interface.AbstractIOReference`.
"""
def __init__(self, future, on_done):
"""
:param asyncio.Future future:
:param callable on_done: user callback that takes the completion result
or exception as its only arg. It will not be called if the operation
was cancelled.
"""
if not callable(on_done):
raise TypeError(
'on_done arg must be callable, but got {!r}'.format(on_done))
self._future = future
def on_done_adapter(future):
"""Handle completion callback from the future instance"""
# NOTE: Asyncio schedules callback for cancelled futures, but pika
# doesn't want that
if not future.cancelled():
on_done(future.exception() or future.result())
future.add_done_callback(on_done_adapter)
def cancel(self):
"""Cancel pending operation
:returns: False if was already done or cancelled; True otherwise
:rtype: bool
"""
return self._future.cancel()

View File

@@ -0,0 +1,501 @@
"""Base class extended by connection adapters. This extends the
connection.Connection class to encapsulate connection behavior but still
isolate socket and low level communication.
"""
import abc
import functools
import logging
import pika.compat
import pika.exceptions
import pika.tcp_socket_opts
from pika.adapters.utils import connection_workflow, nbio_interface
from pika import connection
LOGGER = logging.getLogger(__name__)
class BaseConnection(connection.Connection):
"""BaseConnection class that should be extended by connection adapters.
This class abstracts I/O loop and transport services from pika core.
"""
def __init__(self, parameters, on_open_callback, on_open_error_callback,
on_close_callback, nbio, internal_connection_workflow):
"""Create a new instance of the Connection object.
:param None|pika.connection.Parameters parameters: Connection parameters
:param None|method on_open_callback: Method to call on connection open
:param None | method on_open_error_callback: Called if the connection
can't be established or connection establishment is interrupted by
`Connection.close()`: on_open_error_callback(Connection, exception).
:param None | method on_close_callback: Called when a previously fully
open connection is closed:
`on_close_callback(Connection, exception)`, where `exception` is
either an instance of `exceptions.ConnectionClosed` if closed by
user or broker or exception of another type that describes the cause
of connection failure.
:param pika.adapters.utils.nbio_interface.AbstractIOServices nbio:
asynchronous services
:param bool internal_connection_workflow: True for autonomous connection
establishment which is default; False for externally-managed
connection workflow via the `create_connection()` factory.
:raises: RuntimeError
:raises: ValueError
"""
if parameters and not isinstance(parameters, connection.Parameters):
raise ValueError(
'Expected instance of Parameters, not %r' % (parameters,))
self._nbio = nbio
self._connection_workflow = None # type: connection_workflow.AMQPConnectionWorkflow
self._transport = None # type: pika.adapters.utils.nbio_interface.AbstractStreamTransport
self._got_eof = False # transport indicated EOF (connection reset)
super(BaseConnection, self).__init__(
parameters,
on_open_callback,
on_open_error_callback,
on_close_callback,
internal_connection_workflow=internal_connection_workflow)
def _init_connection_state(self):
"""Initialize or reset all of our internal state variables for a given
connection. If we disconnect and reconnect, all of our state needs to
be wiped.
"""
super(BaseConnection, self)._init_connection_state()
self._connection_workflow = None
self._transport = None
self._got_eof = False
def __repr__(self):
# def get_socket_repr(sock):
# """Return socket info suitable for use in repr"""
# if sock is None:
# return None
#
# sockname = None
# peername = None
# try:
# sockname = sock.getsockname()
# except pika.compat.SOCKET_ERROR:
# # closed?
# pass
# else:
# try:
# peername = sock.getpeername()
# except pika.compat.SOCKET_ERROR:
# # not connected?
# pass
#
# return '%s->%s' % (sockname, peername)
# TODO need helpful __repr__ in transports
return ('<%s %s transport=%s params=%s>' % (
self.__class__.__name__, self._STATE_NAMES[self.connection_state],
self._transport, self.params))
@classmethod
@abc.abstractmethod
def create_connection(cls,
connection_configs,
on_done,
custom_ioloop=None,
workflow=None):
"""Asynchronously create a connection to an AMQP broker using the given
configurations. Will attempt to connect using each config in the given
order, including all compatible resolved IP addresses of the hostname
supplied in each config, until one is established or all attempts fail.
See also `_start_connection_workflow()`.
:param sequence connection_configs: A sequence of one or more
`pika.connection.Parameters`-based objects.
:param callable on_done: as defined in
`connection_workflow.AbstractAMQPConnectionWorkflow.start()`.
:param object | None custom_ioloop: Provide a custom I/O loop that is
native to the specific adapter implementation; if None, the adapter
will use a default loop instance, which is typically a singleton.
:param connection_workflow.AbstractAMQPConnectionWorkflow | None workflow:
Pass an instance of an implementation of the
`connection_workflow.AbstractAMQPConnectionWorkflow` interface;
defaults to a `connection_workflow.AMQPConnectionWorkflow` instance
with default values for optional args.
:returns: Connection workflow instance in use. The user should limit
their interaction with this object only to it's `abort()` method.
:rtype: connection_workflow.AbstractAMQPConnectionWorkflow
"""
raise NotImplementedError
@classmethod
def _start_connection_workflow(cls, connection_configs, connection_factory,
nbio, workflow, on_done):
"""Helper function for custom implementations of `create_connection()`.
:param sequence connection_configs: A sequence of one or more
`pika.connection.Parameters`-based objects.
:param callable connection_factory: A function that takes
`pika.connection.Parameters` as its only arg and returns a brand new
`pika.connection.Connection`-based adapter instance each time it is
called. The factory must instantiate the connection with
`internal_connection_workflow=False`.
:param pika.adapters.utils.nbio_interface.AbstractIOServices nbio:
:param connection_workflow.AbstractAMQPConnectionWorkflow | None workflow:
Pass an instance of an implementation of the
`connection_workflow.AbstractAMQPConnectionWorkflow` interface;
defaults to a `connection_workflow.AMQPConnectionWorkflow` instance
with default values for optional args.
:param callable on_done: as defined in
:py:meth:`connection_workflow.AbstractAMQPConnectionWorkflow.start()`.
:returns: Connection workflow instance in use. The user should limit
their interaction with this object only to it's `abort()` method.
:rtype: connection_workflow.AbstractAMQPConnectionWorkflow
"""
if workflow is None:
workflow = connection_workflow.AMQPConnectionWorkflow()
LOGGER.debug('Created default connection workflow %r', workflow)
if isinstance(workflow, connection_workflow.AMQPConnectionWorkflow):
workflow.set_io_services(nbio)
def create_connector():
"""`AMQPConnector` factory."""
return connection_workflow.AMQPConnector(
lambda params: _StreamingProtocolShim(
connection_factory(params)),
nbio)
workflow.start(
connection_configs=connection_configs,
connector_factory=create_connector,
native_loop=nbio.get_native_ioloop(),
on_done=functools.partial(cls._unshim_connection_workflow_callback,
on_done))
return workflow
@property
def ioloop(self):
"""
:returns: the native I/O loop instance underlying async services selected
by user or the default selected by the specialized connection
adapter (e.g., Twisted reactor, `asyncio.SelectorEventLoop`,
`select_connection.IOLoop`, etc.)
:rtype: object
"""
return self._nbio.get_native_ioloop()
def _adapter_call_later(self, delay, callback):
"""Implement
:py:meth:`pika.connection.Connection._adapter_call_later()`.
"""
return self._nbio.call_later(delay, callback)
def _adapter_remove_timeout(self, timeout_id):
"""Implement
:py:meth:`pika.connection.Connection._adapter_remove_timeout()`.
"""
timeout_id.cancel()
def _adapter_add_callback_threadsafe(self, callback):
"""Implement
:py:meth:`pika.connection.Connection._adapter_add_callback_threadsafe()`.
"""
if not callable(callback):
raise TypeError(
'callback must be a callable, but got %r' % (callback,))
self._nbio.add_callback_threadsafe(callback)
def _adapter_connect_stream(self):
"""Initiate full-stack connection establishment asynchronously for
internally-initiated connection bring-up.
Upon failed completion, we will invoke
`Connection._on_stream_terminated()`. NOTE: On success,
the stack will be up already, so there is no corresponding callback.
"""
self._connection_workflow = connection_workflow.AMQPConnectionWorkflow(
_until_first_amqp_attempt=True)
self._connection_workflow.set_io_services(self._nbio)
def create_connector():
"""`AMQPConnector` factory"""
return connection_workflow.AMQPConnector(
lambda _params: _StreamingProtocolShim(self), self._nbio)
self._connection_workflow.start(
[self.params],
connector_factory=create_connector,
native_loop=self._nbio.get_native_ioloop(),
on_done=functools.partial(self._unshim_connection_workflow_callback,
self._on_connection_workflow_done))
@staticmethod
def _unshim_connection_workflow_callback(user_on_done, shim_or_exc):
"""
:param callable user_on_done: user's `on_done` callback as defined in
:py:meth:`connection_workflow.AbstractAMQPConnectionWorkflow.start()`.
:param _StreamingProtocolShim | Exception shim_or_exc:
"""
result = shim_or_exc
if isinstance(result, _StreamingProtocolShim):
result = result.conn
user_on_done(result)
def _abort_connection_workflow(self):
"""Asynchronously abort connection workflow. Upon
completion, `Connection._on_stream_terminated()` will be called with None
as the error argument.
Assumption: may be called only while connection is opening.
"""
assert not self._opened, (
'_abort_connection_workflow() may be called only when '
'connection is opening.')
if self._transport is None:
# NOTE: this is possible only when user calls Connection.close() to
# interrupt internally-initiated connection establishment.
# self._connection_workflow.abort() would not call
# Connection.close() before pairing of connection with transport.
assert self._internal_connection_workflow, (
'Unexpected _abort_connection_workflow() call with '
'no transport in external connection workflow mode.')
# This will result in call to _on_connection_workflow_done() upon
# completion
self._connection_workflow.abort()
else:
# NOTE: we can't use self._connection_workflow.abort() in this case,
# because it would result in infinite recursion as we're called
# from Connection.close() and _connection_workflow.abort() calls
# Connection.close() to abort a connection that's already been
# paired with a transport. During internally-initiated connection
# establishment, AMQPConnectionWorkflow will discover that user
# aborted the connection when it receives
# pika.exceptions.ConnectionOpenAborted.
# This completes asynchronously, culminating in call to our method
# `connection_lost()`
self._transport.abort()
def _on_connection_workflow_done(self, conn_or_exc):
"""`AMQPConnectionWorkflow` completion callback.
:param BaseConnection | Exception conn_or_exc: Our own connection
instance on success; exception on failure. See
`AbstractAMQPConnectionWorkflow.start()` for details.
"""
LOGGER.debug('Full-stack connection workflow completed: %r',
conn_or_exc)
self._connection_workflow = None
# Notify protocol of failure
if isinstance(conn_or_exc, Exception):
self._transport = None
if isinstance(conn_or_exc,
connection_workflow.AMQPConnectionWorkflowAborted):
LOGGER.info('Full-stack connection workflow aborted: %r',
conn_or_exc)
# So that _handle_connection_workflow_failure() will know it's
# not a failure
conn_or_exc = None
else:
LOGGER.error('Full-stack connection workflow failed: %r',
conn_or_exc)
if (isinstance(conn_or_exc,
connection_workflow.AMQPConnectionWorkflowFailed)
and isinstance(
conn_or_exc.exceptions[-1], connection_workflow.
AMQPConnectorSocketConnectError)):
conn_or_exc = pika.exceptions.AMQPConnectionError(
conn_or_exc)
self._handle_connection_workflow_failure(conn_or_exc)
else:
# NOTE: On success, the stack will be up already, so there is no
# corresponding callback.
assert conn_or_exc is self, \
'Expected self conn={!r} from workflow, but got {!r}.'.format(
self, conn_or_exc)
def _handle_connection_workflow_failure(self, error):
"""Handle failure of self-initiated stack bring-up and call
`Connection._on_stream_terminated()` if connection is not in closed state
yet. Called by adapter layer when the full-stack connection workflow
fails.
:param Exception | None error: exception instance describing the reason
for failure or None if the connection workflow was aborted.
"""
if error is None:
LOGGER.info('Self-initiated stack bring-up aborted.')
else:
LOGGER.error('Self-initiated stack bring-up failed: %r', error)
if not self.is_closed:
self._on_stream_terminated(error)
else:
# This may happen when AMQP layer bring up was started but did not
# complete
LOGGER.debug('_handle_connection_workflow_failure(): '
'suppressing - connection already closed.')
def _adapter_disconnect_stream(self):
"""Asynchronously bring down the streaming transport layer and invoke
`Connection._on_stream_terminated()` asynchronously when complete.
"""
if not self._opened:
self._abort_connection_workflow()
else:
# This completes asynchronously, culminating in call to our method
# `connection_lost()`
self._transport.abort()
def _adapter_emit_data(self, data):
"""Take ownership of data and send it to AMQP server as soon as
possible.
:param bytes data:
"""
self._transport.write(data)
def _proto_connection_made(self, transport):
"""Introduces transport to protocol after transport is connected.
:py:class:`.utils.nbio_interface.AbstractStreamProtocol` implementation.
:param nbio_interface.AbstractStreamTransport transport:
:raises Exception: Exception-based exception on error
"""
self._transport = transport
# Let connection know that stream is available
self._on_stream_connected()
def _proto_connection_lost(self, error):
"""Called upon loss or closing of TCP connection.
:py:class:`.utils.nbio_interface.AbstractStreamProtocol` implementation.
NOTE: `connection_made()` and `connection_lost()` are each called just
once and in that order. All other callbacks are called between them.
:param BaseException | None error: An exception (check for
`BaseException`) indicates connection failure. None indicates that
connection was closed on this side, such as when it's aborted or
when `AbstractStreamProtocol.eof_received()` returns a falsy result.
:raises Exception: Exception-based exception on error
"""
self._transport = None
if error is None:
# Either result of `eof_received()` or abort
if self._got_eof:
error = pika.exceptions.StreamLostError(
'Transport indicated EOF')
else:
error = pika.exceptions.StreamLostError(
'Stream connection lost: {!r}'.format(error))
LOGGER.log(logging.DEBUG if error is None else logging.ERROR,
'connection_lost: %r', error)
self._on_stream_terminated(error)
def _proto_eof_received(self): # pylint: disable=R0201
"""Called after the remote peer shuts its write end of the connection.
:py:class:`.utils.nbio_interface.AbstractStreamProtocol` implementation.
:returns: A falsy value (including None) will cause the transport to
close itself, resulting in an eventual `connection_lost()` call
from the transport. If a truthy value is returned, it will be the
protocol's responsibility to close/abort the transport.
:rtype: falsy|truthy
:raises Exception: Exception-based exception on error
"""
LOGGER.error('Transport indicated EOF.')
self._got_eof = True
# This is how a reset connection will typically present itself
# when we have nothing to send to the server over plaintext stream.
#
# Have transport tear down the connection and invoke our
# `connection_lost` method
return False
def _proto_data_received(self, data):
"""Called to deliver incoming data from the server to the protocol.
:py:class:`.utils.nbio_interface.AbstractStreamProtocol` implementation.
:param data: Non-empty data bytes.
:raises Exception: Exception-based exception on error
"""
self._on_data_available(data)
class _StreamingProtocolShim(nbio_interface.AbstractStreamProtocol):
"""Shim for callbacks from transport so that we BaseConnection can
delegate to private methods, thus avoiding contamination of API with
methods that look public, but aren't.
"""
# Override AbstractStreamProtocol abstract methods to enable instantiation
connection_made = None
connection_lost = None
eof_received = None
data_received = None
def __init__(self, conn):
"""
:param BaseConnection conn:
"""
self.conn = conn
# pylint: disable=W0212
self.connection_made = conn._proto_connection_made
self.connection_lost = conn._proto_connection_lost
self.eof_received = conn._proto_eof_received
self.data_received = conn._proto_data_received
def __getattr__(self, attr):
"""Proxy inexistent attribute requests to our connection instance
so that AMQPConnectionWorkflow/AMQPConnector may treat the shim as an
actual connection.
"""
return getattr(self.conn, attr)
def __repr__(self):
return '{}: {!r}'.format(self.__class__.__name__, self.conn)

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,469 @@
"""Use pika with the Gevent IOLoop."""
import functools
import logging
import os
import threading
import weakref
try:
import queue
except ImportError: # Python <= v2.7
import Queue as queue
import gevent
import gevent.hub
import gevent.socket
import pika.compat
from pika.adapters.base_connection import BaseConnection
from pika.adapters.utils.io_services_utils import check_callback_arg
from pika.adapters.utils.nbio_interface import (
AbstractIOReference,
AbstractIOServices,
)
from pika.adapters.utils.selector_ioloop_adapter import (
AbstractSelectorIOLoop,
SelectorIOServicesAdapter,
)
LOGGER = logging.getLogger(__name__)
class GeventConnection(BaseConnection):
"""Implementation of pika's ``BaseConnection``.
An async selector-based connection which integrates with Gevent.
"""
def __init__(self,
parameters=None,
on_open_callback=None,
on_open_error_callback=None,
on_close_callback=None,
custom_ioloop=None,
internal_connection_workflow=True):
"""Create a new GeventConnection instance and connect to RabbitMQ on
Gevent's event-loop.
:param pika.connection.Parameters|None parameters: The connection
parameters
:param callable|None on_open_callback: The method to call when the
connection is open
:param callable|None on_open_error_callback: Called if the connection
can't be established or connection establishment is interrupted by
`Connection.close()`:
on_open_error_callback(Connection, exception)
:param callable|None on_close_callback: Called when a previously fully
open connection is closed:
`on_close_callback(Connection, exception)`, where `exception` is
either an instance of `exceptions.ConnectionClosed` if closed by
user or broker or exception of another type that describes the
cause of connection failure
:param gevent._interfaces.ILoop|nbio_interface.AbstractIOServices|None
custom_ioloop: Use a custom Gevent ILoop.
:param bool internal_connection_workflow: True for autonomous connection
establishment which is default; False for externally-managed
connection workflow via the `create_connection()` factory
"""
if pika.compat.ON_WINDOWS:
raise RuntimeError('GeventConnection is not supported on Windows.')
custom_ioloop = (custom_ioloop or
_GeventSelectorIOLoop(gevent.get_hub()))
if isinstance(custom_ioloop, AbstractIOServices):
nbio = custom_ioloop
else:
nbio = _GeventSelectorIOServicesAdapter(custom_ioloop)
super(GeventConnection, self).__init__(
parameters,
on_open_callback,
on_open_error_callback,
on_close_callback,
nbio,
internal_connection_workflow=internal_connection_workflow)
@classmethod
def create_connection(cls,
connection_configs,
on_done,
custom_ioloop=None,
workflow=None):
"""Implement
:py:classmethod::`pika.adapters.BaseConnection.create_connection()`.
"""
custom_ioloop = (custom_ioloop or
_GeventSelectorIOLoop(gevent.get_hub()))
nbio = _GeventSelectorIOServicesAdapter(custom_ioloop)
def connection_factory(params):
"""Connection factory."""
if params is None:
raise ValueError('Expected pika.connection.Parameters '
'instance, but got None in params arg.')
return cls(parameters=params,
custom_ioloop=nbio,
internal_connection_workflow=False)
return cls._start_connection_workflow(
connection_configs=connection_configs,
connection_factory=connection_factory,
nbio=nbio,
workflow=workflow,
on_done=on_done)
class _TSafeCallbackQueue(object):
"""Dispatch callbacks from any thread to be executed in the main thread
efficiently with IO events.
"""
def __init__(self):
"""
:param _GeventSelectorIOLoop loop: IO loop to add callbacks to.
"""
# Thread-safe, blocking queue.
self._queue = queue.Queue()
# PIPE to trigger an event when the queue is ready.
self._read_fd, self._write_fd = os.pipe()
# Lock around writes to the PIPE in case some platform/implementation
# requires this.
self._write_lock = threading.RLock()
@property
def fd(self):
"""The file-descriptor to register for READ events in the IO loop."""
return self._read_fd
def add_callback_threadsafe(self, callback):
"""Add an item to the queue from any thread. The configured handler
will be invoked with the item in the main thread.
:param item: Object to add to the queue.
"""
self._queue.put(callback)
with self._write_lock:
# The value written is not important.
os.write(self._write_fd, b'\xFF')
def run_next_callback(self):
"""Invoke the next callback from the queue.
MUST run in the main thread. If no callback was added to the queue,
this will block the IO loop.
Performs a blocking READ on the pipe so must only be called when the
pipe is ready for reading.
"""
try:
callback = self._queue.get_nowait()
except queue.Empty:
# Should never happen.
LOGGER.warning("Callback queue was empty.")
else:
# Read the byte from the pipe so the event doesn't re-fire.
os.read(self._read_fd, 1)
callback()
class _GeventSelectorIOLoop(AbstractSelectorIOLoop):
"""Implementation of `AbstractSelectorIOLoop` using the Gevent event loop.
Required by implementations of `SelectorIOServicesAdapter`.
"""
# Gevent's READ and WRITE masks are defined as 1 and 2 respectively. No
# ERROR mask is defined.
# See http://www.gevent.org/api/gevent.hub.html#gevent._interfaces.ILoop.io
READ = 1
WRITE = 2
ERROR = 0
def __init__(self, gevent_hub=None):
"""
:param gevent._interfaces.ILoop gevent_loop:
"""
self._hub = gevent_hub or gevent.get_hub()
self._io_watchers_by_fd = {}
# Used to start/stop the loop.
self._waiter = gevent.hub.Waiter()
# For adding callbacks from other threads. See `add_callback(..)`.
self._callback_queue = _TSafeCallbackQueue()
def run_callback_in_main_thread(fd, events):
"""Swallow the fd and events arguments."""
del fd
del events
self._callback_queue.run_next_callback()
self.add_handler(self._callback_queue.fd, run_callback_in_main_thread,
self.READ)
def close(self):
"""Release the loop's resources."""
self._hub.loop.destroy()
self._hub = None
def start(self):
"""Run the I/O loop. It will loop until requested to exit. See `stop()`.
"""
LOGGER.debug("Passing control to Gevent's IOLoop")
self._waiter.get() # Block until 'stop()' is called.
LOGGER.debug("Control was passed back from Gevent's IOLoop")
self._waiter.clear()
def stop(self):
"""Request exit from the ioloop. The loop is NOT guaranteed to
stop before this method returns.
To invoke `stop()` safely from a thread other than this IOLoop's thread,
call it via `add_callback_threadsafe`; e.g.,
`ioloop.add_callback(ioloop.stop)`
"""
self._waiter.switch(None)
def add_callback(self, callback):
"""Requests a call to the given function as soon as possible in the
context of this IOLoop's thread.
NOTE: This is the only thread-safe method in IOLoop. All other
manipulations of IOLoop must be performed from the IOLoop's thread.
For example, a thread may request a call to the `stop` method of an
ioloop that is running in a different thread via
`ioloop.add_callback_threadsafe(ioloop.stop)`
:param callable callback: The callback method
"""
if gevent.get_hub() == self._hub:
# We're in the main thread; just add the callback.
LOGGER.debug("Adding callback from main thread")
self._hub.loop.run_callback(callback)
else:
# This isn't the main thread and Gevent's hub/loop don't provide
# any thread-safety so enqueue the callback for it to be registered
# in the main thread.
LOGGER.debug("Adding callback from another thread")
callback = functools.partial(self._hub.loop.run_callback, callback)
self._callback_queue.add_callback_threadsafe(callback)
def call_later(self, delay, callback):
"""Add the callback to the IOLoop timer to be called after delay seconds
from the time of call on best-effort basis. Returns a handle to the
timeout.
:param float delay: The number of seconds to wait to call callback
:param callable callback: The callback method
:returns: handle to the created timeout that may be passed to
`remove_timeout()`
:rtype: object
"""
timer = self._hub.loop.timer(delay)
timer.start(callback)
return timer
def remove_timeout(self, timeout_handle):
"""Remove a timeout
:param timeout_handle: Handle of timeout to remove
"""
timeout_handle.close()
def add_handler(self, fd, handler, events):
"""Start watching the given file descriptor for events
:param int fd: The file descriptor
:param callable handler: When requested event(s) occur,
`handler(fd, events)` will be called.
:param int events: The event mask (READ|WRITE)
"""
io_watcher = self._hub.loop.io(fd, events)
self._io_watchers_by_fd[fd] = io_watcher
io_watcher.start(handler, fd, events)
def update_handler(self, fd, events):
"""Change the events being watched for.
:param int fd: The file descriptor
:param int events: The new event mask (READ|WRITE)
"""
io_watcher = self._io_watchers_by_fd[fd]
# Save callback from the original watcher. The close the old watcher
# and create a new one using the saved callback and the new events.
callback = io_watcher.callback
io_watcher.close()
del self._io_watchers_by_fd[fd]
self.add_handler(fd, callback, events)
def remove_handler(self, fd):
"""Stop watching the given file descriptor for events
:param int fd: The file descriptor
"""
io_watcher = self._io_watchers_by_fd[fd]
io_watcher.close()
del self._io_watchers_by_fd[fd]
class _GeventSelectorIOServicesAdapter(SelectorIOServicesAdapter):
"""SelectorIOServicesAdapter implementation using Gevent's DNS resolver."""
def getaddrinfo(self,
host,
port,
on_done,
family=0,
socktype=0,
proto=0,
flags=0):
"""Implement :py:meth:`.nbio_interface.AbstractIOServices.getaddrinfo()`.
"""
resolver = _GeventAddressResolver(native_loop=self._loop,
host=host,
port=port,
family=family,
socktype=socktype,
proto=proto,
flags=flags,
on_done=on_done)
resolver.start()
# Return needs an implementation of `AbstractIOReference`.
return _GeventIOLoopIOHandle(resolver)
class _GeventIOLoopIOHandle(AbstractIOReference):
"""Implement `AbstractIOReference`.
Only used to wrap the _GeventAddressResolver.
"""
def __init__(self, subject):
"""
:param subject: subject of the reference containing a `cancel()` method
"""
self._cancel = subject.cancel
def cancel(self):
"""Cancel pending operation
:returns: False if was already done or cancelled; True otherwise
:rtype: bool
"""
return self._cancel()
class _GeventAddressResolver(object):
"""Performs getaddrinfo asynchronously Gevent's configured resolver in a
separate greenlet and invoking the provided callback with the result.
See: http://www.gevent.org/dns.html
"""
__slots__ = (
'_loop',
'_on_done',
'_greenlet',
# getaddrinfo(..) args:
'_ga_host',
'_ga_port',
'_ga_family',
'_ga_socktype',
'_ga_proto',
'_ga_flags')
def __init__(self, native_loop, host, port, family, socktype, proto, flags,
on_done):
"""Initialize the `_GeventAddressResolver`.
:param AbstractSelectorIOLoop native_loop:
:param host: `see socket.getaddrinfo()`
:param port: `see socket.getaddrinfo()`
:param family: `see socket.getaddrinfo()`
:param socktype: `see socket.getaddrinfo()`
:param proto: `see socket.getaddrinfo()`
:param flags: `see socket.getaddrinfo()`
:param on_done: on_done(records|BaseException) callback for reporting
result from the given I/O loop. The single arg will be either an
exception object (check for `BaseException`) in case of failure or
the result returned by `socket.getaddrinfo()`.
"""
check_callback_arg(on_done, 'on_done')
self._loop = native_loop
self._on_done = on_done
# Reference to the greenlet performing `getaddrinfo`.
self._greenlet = None
# getaddrinfo(..) args.
self._ga_host = host
self._ga_port = port
self._ga_family = family
self._ga_socktype = socktype
self._ga_proto = proto
self._ga_flags = flags
def start(self):
"""Start an asynchronous getaddrinfo invocation."""
if self._greenlet is None:
self._greenlet = gevent.spawn_raw(self._resolve)
else:
LOGGER.warning("_GeventAddressResolver already started")
def cancel(self):
"""Cancel the pending resolver."""
changed = False
if self._greenlet is not None:
changed = True
self._stop_greenlet()
self._cleanup()
return changed
def _cleanup(self):
"""Stop the resolver and release any resources."""
self._stop_greenlet()
self._loop = None
self._on_done = None
def _stop_greenlet(self):
"""Stop the greenlet performing getaddrinfo if running.
Otherwise, this is a no-op.
"""
if self._greenlet is not None:
gevent.kill(self._greenlet)
self._greenlet = None
def _resolve(self):
"""Call `getaddrinfo()` and return result via user's callback
function on the configured IO loop.
"""
try:
# NOTE(JG): Can't use kwargs with getaddrinfo on Python <= v2.7.
result = gevent.socket.getaddrinfo(self._ga_host, self._ga_port,
self._ga_family,
self._ga_socktype,
self._ga_proto, self._ga_flags)
except Exception as exc: # pylint: disable=broad-except
LOGGER.error('Address resolution failed: %r', exc)
result = exc
callback = functools.partial(self._dispatch_callback, result)
self._loop.add_callback(callback)
def _dispatch_callback(self, result):
"""Invoke the configured completion callback and any subsequent cleanup.
:param result: result from getaddrinfo, or the exception if raised.
"""
try:
LOGGER.debug(
'Invoking async getaddrinfo() completion callback; host=%r',
self._ga_host)
self._on_done(result)
finally:
self._cleanup()

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,90 @@
"""Use pika with the Tornado IOLoop
"""
import logging
from tornado import ioloop
from pika.adapters.utils import nbio_interface, selector_ioloop_adapter
from pika.adapters import base_connection
LOGGER = logging.getLogger(__name__)
class TornadoConnection(base_connection.BaseConnection):
"""The TornadoConnection runs on the Tornado IOLoop.
"""
def __init__(self,
parameters=None,
on_open_callback=None,
on_open_error_callback=None,
on_close_callback=None,
custom_ioloop=None,
internal_connection_workflow=True):
"""Create a new instance of the TornadoConnection class, connecting
to RabbitMQ automatically.
:param pika.connection.Parameters|None parameters: The connection
parameters
:param callable|None on_open_callback: The method to call when the
connection is open
:param callable|None on_open_error_callback: Called if the connection
can't be established or connection establishment is interrupted by
`Connection.close()`:
on_open_error_callback(Connection, exception)
:param callable|None on_close_callback: Called when a previously fully
open connection is closed:
`on_close_callback(Connection, exception)`, where `exception` is
either an instance of `exceptions.ConnectionClosed` if closed by
user or broker or exception of another type that describes the
cause of connection failure
:param ioloop.IOLoop|nbio_interface.AbstractIOServices|None custom_ioloop:
Override using the global IOLoop in Tornado
:param bool internal_connection_workflow: True for autonomous connection
establishment which is default; False for externally-managed
connection workflow via the `create_connection()` factory
"""
if isinstance(custom_ioloop, nbio_interface.AbstractIOServices):
nbio = custom_ioloop
else:
nbio = (selector_ioloop_adapter.SelectorIOServicesAdapter(
custom_ioloop or ioloop.IOLoop.instance()))
super(TornadoConnection, self).__init__(
parameters,
on_open_callback,
on_open_error_callback,
on_close_callback,
nbio,
internal_connection_workflow=internal_connection_workflow)
@classmethod
def create_connection(cls,
connection_configs,
on_done,
custom_ioloop=None,
workflow=None):
"""Implement
:py:classmethod::`pika.adapters.BaseConnection.create_connection()`.
"""
nbio = selector_ioloop_adapter.SelectorIOServicesAdapter(
custom_ioloop or ioloop.IOLoop.instance())
def connection_factory(params):
"""Connection factory."""
if params is None:
raise ValueError('Expected pika.connection.Parameters '
'instance, but got None in params arg.')
return cls(
parameters=params,
custom_ioloop=nbio,
internal_connection_workflow=False)
return cls._start_connection_workflow(
connection_configs=connection_configs,
connection_factory=connection_factory,
nbio=nbio,
workflow=workflow,
on_done=on_done)

File diff suppressed because it is too large Load Diff

Some files were not shown because too many files have changed in this diff Show More