Compare commits
27 Commits
yakovleva_
...
vaksman_va
| Author | SHA1 | Date | |
|---|---|---|---|
| 1f72d4dc70 | |||
| b351431f51 | |||
| 56baf52b61 | |||
| 8a96320fd5 | |||
| bd25930973 | |||
| 37996c249a | |||
| 9456d4fe01 | |||
| c14e105db5 | |||
| 4d1e900721 | |||
| 7184d6d728 | |||
|
|
6e7055efa4 | ||
|
|
9e40adc53c | ||
|
|
4a36528cc7 | ||
| ad3988e5fc | |||
| 780b4b2924 | |||
| 5047b16cde | |||
| 2b87427299 | |||
|
|
21cdd4971d | ||
| bdb5cc07ed | |||
| e761e33201 | |||
|
|
ceee500b95 | ||
| 2be2c71b69 | |||
|
|
aa8180ba49 | ||
|
|
ba7480cb4f | ||
| 520337f92d | |||
| 06d1d8cdd4 | |||
|
|
1adaac9281 |
2
bazunov_andrew_lab_1/.gitignore
vendored
Normal file
@@ -0,0 +1,2 @@
|
||||
ollama
|
||||
./ollama
|
||||
33
bazunov_andrew_lab_1/README.md
Normal file
@@ -0,0 +1,33 @@
|
||||
# Распределенные вычисления и приложения Л1
|
||||
## _Автор Базунов Андрей Игревич ПИбд-42_
|
||||
|
||||
В качестве сервисов были выбраны:
|
||||
- 1.Ollama (_Сервис для использования LLMs моделей_)
|
||||
- 2.Open Web Ui (_Сервис для удобного общения с моделью из сервиса Ollama_)
|
||||
- 3.Gitea (_Гит сервис_)
|
||||
|
||||
# Docker
|
||||
|
||||
>Перед исполнением вполняем установку docker и проверяем версию
|
||||
|
||||
```sh
|
||||
docker-compose --version
|
||||
```
|
||||
|
||||
>Далее производим настройку файла docker-compose.yaml и запускаем контейнер
|
||||
|
||||
```sh
|
||||
docker-compose up -d
|
||||
```
|
||||
|
||||
>Для завершения работы контейнера используем команду
|
||||
```sh
|
||||
docker-compose down
|
||||
```
|
||||
---
|
||||
> Замечание: после запуска контейнера, необходимо перейти в контейнер **ollamа** и выполнить установку модели [gemma2](https://ollama.com/library/gemma2:2b)
|
||||
> ```sh
|
||||
> docker-compose exec ollama ollama run ollama run gemma2:2b
|
||||
> ```
|
||||
---
|
||||
Далее можно использовать веб сервис Open Web Ui по адресу **localhost:8080** для общения с моделью и Gitea по адресу **localhost:3000** - [демонстрация работы](https://vk.com/video/@viltskaa?z=video236673313_456239574%2Fpl_236673313_-2)
|
||||
61
bazunov_andrew_lab_1/docker-compose.yml
Normal file
@@ -0,0 +1,61 @@
|
||||
services:
|
||||
gitea: # Имя сервиса
|
||||
image: gitea/gitea:latest # Имя образа
|
||||
container_name: gitea # Имя контейнера, может быть произовольным
|
||||
ports:
|
||||
- "3000:3000" # Проброс порта Gitea на хост
|
||||
volumes: # хранилище
|
||||
- data:/data
|
||||
environment: # переменные окружения
|
||||
USER_UID: 1000
|
||||
USER_GID: 1000
|
||||
|
||||
ollama:
|
||||
image: ollama/ollama:latest
|
||||
container_name: ollama
|
||||
restart: always
|
||||
ports:
|
||||
- 7869:11434
|
||||
pull_policy: always
|
||||
tty: true
|
||||
volumes:
|
||||
- .:/code
|
||||
- ./ollama/ollama:/root/.ollama # Директория для данных Ollama
|
||||
environment:
|
||||
- OLLAMA_KEEP_ALIVE=24h
|
||||
- OLLAMA_HOST=0.0.0.0 # Указываем хост для API Ollama
|
||||
networks:
|
||||
- ollama-docker
|
||||
command: ["serve"] # Запускаем Ollama в режиме сервера
|
||||
|
||||
ollama-webui:
|
||||
image: ghcr.io/open-webui/open-webui:main # Образ Open Web UI
|
||||
container_name: ollama-webui
|
||||
restart: unless-stopped
|
||||
volumes:
|
||||
- ./ollama/ollama-webui:/app/backend/data
|
||||
ports:
|
||||
- 8080:8080 # Порт для веб-интерфейса
|
||||
environment: # https://docs.openwebui.com/getting-started/env-configuration#default_models
|
||||
- OLLAMA_BASE_URLS=http://host.docker.internal:7869
|
||||
- ENV=dev
|
||||
- WEBUI_AUTH=False
|
||||
- WEBUI_NAME=Viltskaa AI
|
||||
- WEBUI_URL=http://localhost:8080
|
||||
- WEBUI_SECRET_KEY=t0p-s3cr3t
|
||||
depends_on:
|
||||
- ollama
|
||||
extra_hosts:
|
||||
- host.docker.internal:host-gateway
|
||||
networks:
|
||||
- ollama-docker
|
||||
|
||||
networks:
|
||||
ollama-docker:
|
||||
external: false
|
||||
|
||||
volumes:
|
||||
ollama:
|
||||
driver: local
|
||||
data:
|
||||
driver: local
|
||||
38
borschevskaya_anna_lab_2/.gitignore
vendored
Normal file
@@ -0,0 +1,38 @@
|
||||
target/
|
||||
!.mvn/wrapper/maven-wrapper.jar
|
||||
!**/src/main/**/target/
|
||||
!**/src/test/**/target/
|
||||
|
||||
### IntelliJ IDEA ###
|
||||
.idea/modules.xml
|
||||
.idea/jarRepositories.xml
|
||||
.idea/compiler.xml
|
||||
.idea/libraries/
|
||||
*.iws
|
||||
*.iml
|
||||
*.ipr
|
||||
|
||||
### Eclipse ###
|
||||
.apt_generated
|
||||
.classpath
|
||||
.factorypath
|
||||
.project
|
||||
.settings
|
||||
.springBeans
|
||||
.sts4-cache
|
||||
|
||||
### NetBeans ###
|
||||
/nbproject/private/
|
||||
/nbbuild/
|
||||
/dist/
|
||||
/nbdist/
|
||||
/.nb-gradle/
|
||||
build/
|
||||
!**/src/main/**/build/
|
||||
!**/src/test/**/build/
|
||||
|
||||
### VS Code ###
|
||||
.vscode/
|
||||
|
||||
### Mac OS ###
|
||||
.DS_Store
|
||||
43
borschevskaya_anna_lab_2/README.md
Normal file
@@ -0,0 +1,43 @@
|
||||
# Отчет. Лабораторная работа 2
|
||||
|
||||
В рамках лабораторной работы №2 были написаны два сервиса, работающих с текстовыми файлами.
|
||||
Для первого сервиса был выбран вариант задания №5:
|
||||
```
|
||||
Ищет в каталоге /var/data файл с самым коротким названием и перекладывает его в /var/result/data.txt.
|
||||
```
|
||||
А для второго - №2:
|
||||
```
|
||||
Ищет наименьшее число из файла /var/data/data.txt и сохраняет его третью степень в /var/result/result.txt.
|
||||
```
|
||||
## Описание
|
||||
Сначала сервис first перемещает данные из файла с самым коротким названием, находящегося в указанной примонтированной директории, в выходную папку.
|
||||
Доступ к выходной папке имеет второй сервис, который выводит наименьшее число из помещенного первым сервисом файла
|
||||
в третьей степени в выходной файл.
|
||||
Выходной файл расположен в примонтированной директории и доступен на машине, где запускаются сервисы.
|
||||
|
||||
В Dockerfile используется многоэтапная сборка с использованием нескольких базовых образов на каждом этапе.
|
||||
Описание значения каждой строки есть в Dockerfile в сервисе first.
|
||||
|
||||
В файле docker-compose.yml приведено описание новых строк, связанных с подключением примонтированных томов.
|
||||
Стоит отметить, что для "общения" сервисов используется общий том common, который монтируется в контейнер по пути /var/result. Это позволяет сохранять результаты
|
||||
работы первого сервиса для использования вторым сервисом.
|
||||
## Как запустить
|
||||
Для того, чтобы запустить сервисы, необходимо выполнить следующие действия:
|
||||
1. Установить и запустить Docker Engine или Docker Desktop
|
||||
2. Через консоль перейти в папку, в которой расположен файл docker-compose.yml
|
||||
3. Выполнить команду:
|
||||
```
|
||||
docker compose up --build
|
||||
```
|
||||
В случае успешного запуска всех контейнеров в консоли будет выведено следующее сообщение:
|
||||
```
|
||||
✔ Network borschevskaya_anna_lab_2_default Created 0.1s
|
||||
✔ Container borschevskaya_anna_lab_2-first-1 Created 0.1s
|
||||
✔ Container borschevskaya_anna_lab_2-second-1 Created 0.1s
|
||||
Attaching to borschevskaya_anna_lab_2-first-1, borschevskaya_anna_lab_2-second-1
|
||||
```
|
||||
Далее, в консоль каждого сервиса будут выведены сообщения о том, как прошла обработка файлов.
|
||||
В случае отсутствия заданных значений переменных окружения INPUT_PATH и OUTPUT_PATH и
|
||||
в иных исключительных ситуация будет выведена информация об этом.
|
||||
## Видео-отчет
|
||||
Работоспособность лабораторной работы можно оценить в следующем [видео](https://disk.yandex.ru/i/LFxdyRUFQDwXEQ).
|
||||
22
borschevskaya_anna_lab_2/docker-compose.yml
Normal file
@@ -0,0 +1,22 @@
|
||||
services:
|
||||
first:
|
||||
build: ./first # директория, в которой нужно искать Dockerfile для сборки первого сервиса
|
||||
environment:
|
||||
INPUT_PATH: /var/data/ # директория с входными данными для обработки файлов
|
||||
OUTPUT_PATH: /var/result/ # директория с выходными данными обработки
|
||||
volumes:
|
||||
- ./volumes/input:/var/data # монтируется локальная папка с входными данными в папку внутри контейнера
|
||||
- common:/var/result # монтируется общий для двух сервисов том, в который first сложит результаты обработки по варианту
|
||||
second:
|
||||
build: ./second # директория, в которой нужно искать Dockerfile для сборки второго сервиса
|
||||
depends_on: # сервис second зависит от сервиса first и будет запущен после него
|
||||
- first
|
||||
environment:
|
||||
INPUT_PATH: /var/result/
|
||||
OUTPUT_PATH: /var/data/
|
||||
volumes:
|
||||
- ./volumes/output:/var/data
|
||||
- common:/var/result # монтируется общий для двух сервисов том, из которого second получит результаты обработки first сервиса и выполнит свою логику
|
||||
|
||||
volumes:
|
||||
common:
|
||||
25
borschevskaya_anna_lab_2/first/Dockerfile
Normal file
@@ -0,0 +1,25 @@
|
||||
# Используем образ Maven для сборки
|
||||
FROM maven:3.8-eclipse-temurin-21-alpine AS build
|
||||
|
||||
# Устанавливаем рабочую директорию
|
||||
WORKDIR /app
|
||||
|
||||
# Копируем только pom.xml и загружаем зависимости
|
||||
# Так зависимости закэшируются в Docker при изменении кода закэшированные слои с зависимостями будут подгружаться быстрее
|
||||
COPY pom.xml .
|
||||
RUN mvn dependency:go-offline
|
||||
|
||||
# Копируем остальные исходные файлы
|
||||
COPY src ./src
|
||||
|
||||
# Собираем весь проект
|
||||
RUN mvn clean package -DskipTests
|
||||
|
||||
# Используем официальный образ JDK для запуска собранного jar-файла
|
||||
FROM eclipse-temurin:21-jdk-alpine
|
||||
|
||||
# Копируем jar-файл из предыдущего этапа
|
||||
COPY --from=build /app/target/*.jar /app.jar
|
||||
|
||||
# Указываем команду для запуска приложения
|
||||
CMD ["java", "-jar", "app.jar"]
|
||||
37
borschevskaya_anna_lab_2/first/pom.xml
Normal file
@@ -0,0 +1,37 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<groupId>ru.first</groupId>
|
||||
<artifactId>first</artifactId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
|
||||
<properties>
|
||||
<maven.compiler.source>21</maven.compiler.source>
|
||||
<maven.compiler.target>21</maven.compiler.target>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
</properties>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<!-- Build an executable JAR -->
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-jar-plugin</artifactId>
|
||||
<version>3.1.0</version>
|
||||
<configuration>
|
||||
<archive>
|
||||
<manifest>
|
||||
<addClasspath>true</addClasspath>
|
||||
<classpathPrefix>lib/</classpathPrefix>
|
||||
<mainClass>ru.first.Main</mainClass>
|
||||
</manifest>
|
||||
</archive>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
</project>
|
||||
@@ -0,0 +1,50 @@
|
||||
package ru.first;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
|
||||
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
|
||||
import static java.util.Objects.isNull;
|
||||
|
||||
public class Main {
|
||||
|
||||
public static final String INPUT_PATH = System.getenv("INPUT_PATH");
|
||||
public static final String OUTPUT_PATH = System.getenv("OUTPUT_PATH");
|
||||
public static final String RESULT_FILE_NAME = "data.txt";
|
||||
|
||||
public static void main(String[] args) throws IOException {
|
||||
if (isNull(INPUT_PATH) || INPUT_PATH.isEmpty() || isNull(OUTPUT_PATH) || OUTPUT_PATH.isEmpty()) {
|
||||
System.out.printf("Отсутствуют переменные окружения INPUT_PATH = '%s' или OUTPUT_PATH = '%s'%n",
|
||||
INPUT_PATH, OUTPUT_PATH);
|
||||
return;
|
||||
}
|
||||
var inputPathDir = Path.of(INPUT_PATH);
|
||||
if (!Files.exists(inputPathDir)) {
|
||||
Files.createDirectory(inputPathDir);
|
||||
}
|
||||
var inputDirectory = new File(INPUT_PATH);
|
||||
var allDirFiles = inputDirectory.listFiles();
|
||||
if (isNull(allDirFiles) || allDirFiles.length == 0) {
|
||||
System.out.println("Директория пуста");
|
||||
return;
|
||||
}
|
||||
var dirFiles = Arrays.stream(allDirFiles).filter(File::isFile).toList();
|
||||
if (dirFiles.isEmpty()) {
|
||||
System.out.println("В указанной директории нет подходящих для обработки файлов");
|
||||
return;
|
||||
}
|
||||
|
||||
var shortestName = dirFiles.stream().min(Comparator.comparing(file -> file.getName().length())).get();
|
||||
|
||||
var outputPathDir = Path.of(OUTPUT_PATH);
|
||||
if (!Files.exists(outputPathDir)) {
|
||||
Files.createDirectory(outputPathDir);
|
||||
}
|
||||
var resultFilePath = Path.of(OUTPUT_PATH + File.separator + RESULT_FILE_NAME);
|
||||
Files.move(Path.of(INPUT_PATH + File.separator + shortestName.getName()), resultFilePath, REPLACE_EXISTING);
|
||||
}
|
||||
}
|
||||
16
borschevskaya_anna_lab_2/second/Dockerfile
Normal file
@@ -0,0 +1,16 @@
|
||||
FROM maven:3.8-eclipse-temurin-21-alpine AS build
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
COPY pom.xml .
|
||||
RUN mvn dependency:go-offline
|
||||
|
||||
COPY src ./src
|
||||
|
||||
RUN mvn clean package -DskipTests
|
||||
|
||||
FROM eclipse-temurin:21-jdk-alpine
|
||||
|
||||
COPY --from=build /app/target/*.jar /app.jar
|
||||
|
||||
CMD ["java", "-jar", "app.jar"]
|
||||
36
borschevskaya_anna_lab_2/second/pom.xml
Normal file
@@ -0,0 +1,36 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<groupId>ru.second</groupId>
|
||||
<artifactId>second</artifactId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
|
||||
<properties>
|
||||
<maven.compiler.source>21</maven.compiler.source>
|
||||
<maven.compiler.target>21</maven.compiler.target>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
</properties>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-jar-plugin</artifactId>
|
||||
<version>3.1.0</version>
|
||||
<configuration>
|
||||
<archive>
|
||||
<manifest>
|
||||
<addClasspath>true</addClasspath>
|
||||
<classpathPrefix>lib/</classpathPrefix>
|
||||
<mainClass>ru.second.Main</mainClass>
|
||||
</manifest>
|
||||
</archive>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
</project>
|
||||
@@ -0,0 +1,51 @@
|
||||
package ru.second;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileWriter;
|
||||
import java.nio.file.Files;
|
||||
|
||||
import static java.util.Objects.isNull;
|
||||
|
||||
public class Main {
|
||||
|
||||
public static final String INPUT_PATH = System.getenv("INPUT_PATH");
|
||||
public static final String INPUT_FILE_NAME = "data.txt";
|
||||
|
||||
public static final String OUTPUT_PATH = System.getenv("OUTPUT_PATH");
|
||||
public static final String RESULT_FILE_NAME = "result.txt";
|
||||
|
||||
public static void main(String[] args) {
|
||||
if (isNull(INPUT_PATH) || INPUT_PATH.isEmpty() || isNull(OUTPUT_PATH) || OUTPUT_PATH.isEmpty()) {
|
||||
System.out.printf("Отсутствуют переменные окружения INPUT_PATH = '%s' или OUTPUT_PATH = '%s'%n",
|
||||
INPUT_PATH, OUTPUT_PATH);
|
||||
return;
|
||||
}
|
||||
|
||||
var inputFile = new File(INPUT_PATH + File.separator + INPUT_FILE_NAME);
|
||||
if (!inputFile.exists()) {
|
||||
System.out.println("Входной файл не существует");
|
||||
return;
|
||||
}
|
||||
|
||||
try (var stream = Files.lines(inputFile.toPath());
|
||||
var writer = new FileWriter(OUTPUT_PATH + File.separator + RESULT_FILE_NAME);
|
||||
) {
|
||||
var min = stream.map(Main::parseInt).reduce(Integer::min);
|
||||
if (min.isEmpty()) {
|
||||
System.out.println("Не найдено минимальное значение среди строк файла");
|
||||
return;
|
||||
}
|
||||
var minValue = Math.pow(min.get(), 3);
|
||||
System.out.printf("Get min value = '%d'%n", min.get());
|
||||
writer.append(Double.toString(minValue));
|
||||
System.out.printf("To file %s was written value %f%n", RESULT_FILE_NAME, minValue);
|
||||
} catch (Exception ex) {
|
||||
System.out.println(ex.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
private static Integer parseInt(String line) {
|
||||
line = line.replace("\\n", "");
|
||||
return Integer.parseInt(line);
|
||||
}
|
||||
}
|
||||
4
dolgov_dmitriy_lab_1/.gitignore
vendored
Normal file
@@ -0,0 +1,4 @@
|
||||
data/
|
||||
log/
|
||||
wordpress/
|
||||
custom/
|
||||
34
dolgov_dmitriy_lab_1/README.md
Normal file
@@ -0,0 +1,34 @@
|
||||
# Лабораторная работа №1
|
||||
|
||||
## Выполнил: Долгов Дмитрий, группа ПИбд-42
|
||||
|
||||
### Были развёрнуты следующие сервисы:
|
||||
|
||||
* mediawiki (движок вики)
|
||||
* wordpress (популярная система управления контентом)
|
||||
* gitea (сервис для хранения репозиториев git)
|
||||
* mariaDB
|
||||
### Были использованы следующие технологии:
|
||||
|
||||
* git
|
||||
* docker
|
||||
* docker-compose
|
||||
|
||||
|
||||
### Для запуска лабораторной работы необходимо ввести в консоль следующую команду:
|
||||
```
|
||||
docker compose up -d
|
||||
```
|
||||
|
||||
## Результат запуска:
|
||||
```
|
||||
[+] Running 4/4
|
||||
✔ Container dolgov_dmitriy_lab_1-wordpress-1 Running 0.0s
|
||||
✔ Container dolgov_dmitriy_lab_1-database-1 Running 0.0s
|
||||
✔ Container dolgov_dmitriy_lab_1-mediawiki-1 Running 0.0s
|
||||
✔ Container gitea Running
|
||||
```
|
||||
|
||||
## Видео с результатом запуска:
|
||||
|
||||
Видео можно посмотреть по данной [ссылке](https://drive.google.com/file/d/1hC6HhNvYBRuYVClobXyDMReA4ngwxhwc/view?usp=drive_link).
|
||||
73
dolgov_dmitriy_lab_1/docker-compose.yml
Normal file
@@ -0,0 +1,73 @@
|
||||
# Сервисы по заданию
|
||||
services:
|
||||
# Сервис MediaWiki
|
||||
mediawiki:
|
||||
# Образ MediaWiki
|
||||
image: mediawiki
|
||||
# Автоматический перезапуск при сбое
|
||||
restart: always
|
||||
# проброс порта 80 из контейнера на порт 8080 хоста
|
||||
ports:
|
||||
- "8080:80"
|
||||
# связь с сервисом database
|
||||
links:
|
||||
- database
|
||||
# монтирование volume для хранения данных
|
||||
volumes:
|
||||
- images:/var/www/html/images
|
||||
|
||||
# Сервис WordPress
|
||||
wordpress:
|
||||
# Образ WordPress
|
||||
image: wordpress:latest
|
||||
# Автоматический перезапуск при сбое
|
||||
ports:
|
||||
- "8082:80"
|
||||
restart: always
|
||||
volumes:
|
||||
- ./wordpress:/var/www/html
|
||||
|
||||
# Сервис Gitea
|
||||
server:
|
||||
image: gitea/gitea:latest
|
||||
container_name: gitea
|
||||
restart: always
|
||||
environment:
|
||||
- USER_UID=1000
|
||||
- USER_GID=1000
|
||||
volumes:
|
||||
- ./data:/data
|
||||
- ./custom:/app/gitea/custom
|
||||
- ./log:/app/gitea/log
|
||||
ports:
|
||||
- "8081:3000"
|
||||
links:
|
||||
- database
|
||||
depends_on:
|
||||
- database
|
||||
|
||||
# Сервис MariaDB
|
||||
database:
|
||||
# Образ MariaDB
|
||||
image: mariadb
|
||||
# Автоматический перезапуск при сбое
|
||||
restart: always
|
||||
# переменные окружения для настройки базы данных
|
||||
environment:
|
||||
# имя базы данных
|
||||
MYSQL_DATABASE: my_wiki
|
||||
# имя пользователя
|
||||
MYSQL_USER: user
|
||||
# пароль пользователя
|
||||
MYSQL_PASSWORD: user
|
||||
# случайный пароль для пользователя root
|
||||
MYSQL_RANDOM_ROOT_PASSWORD: 'yes'
|
||||
# монтирование volume для хранения данных
|
||||
volumes:
|
||||
- db:/var/lib/mysql
|
||||
|
||||
# тома для хранения данных
|
||||
volumes:
|
||||
images:
|
||||
db:
|
||||
|
||||
BIN
dolgov_dmitriy_lab_1/screenshots/image.png
Normal file
|
After Width: | Height: | Size: 275 KiB |
29
presnyakova_victoria_lab_1/docker-compose.yml
Normal file
@@ -0,0 +1,29 @@
|
||||
services:
|
||||
rabbitmq:
|
||||
image: rabbitmq:3.12.8-management
|
||||
environment:
|
||||
RABBITMQ_DEFAULT_USER: admin
|
||||
RABBITMQ_DEFAULT_PASS: admin
|
||||
ports:
|
||||
- 15672:15672
|
||||
volumes:
|
||||
- rabbitmq-data:/var/lib/rabbitmq
|
||||
|
||||
mediawiki:
|
||||
image: mediawiki
|
||||
ports:
|
||||
- 8081:80
|
||||
volumes:
|
||||
- mediawiki-data:/var/files/mediawiki
|
||||
|
||||
wordpress:
|
||||
image: wordpress
|
||||
ports:
|
||||
- 8082:80
|
||||
volumes:
|
||||
- wordpress-data:/var/files/wordpress
|
||||
|
||||
volumes:
|
||||
rabbitmq-data:
|
||||
mediawiki-data:
|
||||
wordpress-data:
|
||||
26
presnyakova_victoria_lab_1/readme.md
Normal file
@@ -0,0 +1,26 @@
|
||||
# Docker Compose: RabbitMQ, Mediawiki, Wordpress
|
||||
|
||||
## Описание проекта
|
||||
|
||||
Этот проект разворачивает три сервиса с помощью Docker Compose:
|
||||
1. **RabbitMQ** — брокер сообщений.
|
||||
2. **Mediawiki** — движок вики.
|
||||
3. **Wordpress** — популярная система управления контентом.
|
||||
|
||||
|
||||
## Команды для запуска
|
||||
|
||||
Из директории с файлом docker-compose.yml запустить сервисы docker-compose up --build
|
||||
|
||||
## Сервисы и порты
|
||||
1. **RabbitMQ:**
|
||||
Доступ по адресу http://localhost:15672/ (логин: admin, пароль: admin).
|
||||
|
||||
2. **Mediawiki:**
|
||||
Доступ по адресу http://localhost:8081/.
|
||||
|
||||
|
||||
3. **Wordpress:**
|
||||
Доступ по адресу http://localhost:8082/.
|
||||
|
||||
## Видео https://drive.google.com/file/d/1NvsMFoMU2ecsQ17EouqB_ZaLBskonHv0/view?usp=sharing
|
||||
2
tsukanova_irina_lab_2/.gitignore
vendored
Normal file
@@ -0,0 +1,2 @@
|
||||
/.venv
|
||||
/.idea
|
||||
16
tsukanova_irina_lab_2/README.md
Normal file
@@ -0,0 +1,16 @@
|
||||
# Цуканова Ирина ПИбд-32
|
||||
# Лабораторная работа №2 - Разработка простейшего распределённого приложения
|
||||
|
||||
### Язык разработки приложений: Python
|
||||
|
||||
## Выбранные варианты
|
||||
- Для программы 1: Ищет в каталоге ```/var/data``` самый большой по объёму файл и перекладывает его в ```/var/result/data.txt```.
|
||||
- Для программы 2: Сохраняет произведение первого и последнего числа из файла ```/var/result/data.txt``` в ```/var/result/result.txt```.
|
||||
|
||||
## Описание:
|
||||
Каждая программа лежит в своей папке, первая в worker-1, вторая в worker-2.
|
||||
В этих же папках лежат Dockerfile'ы с инструкциями по сборке. В них присутствуют комментарии для значимых строк.
|
||||
Монтированные папки ```data``` для ```/var/data``` и ```result``` для ```/var/result```.
|
||||
|
||||
|
||||
## [Видео](https://drive.google.com/file/d/1eBbIDgTo3MF4EeM677EPEKgJEINekaC0/view?usp=drive_link)
|
||||
1
tsukanova_irina_lab_2/data/file_1.txt
Normal file
@@ -0,0 +1 @@
|
||||
34 905 63 92 74 9 3 25 8 0 2 4 24 452 94 6 2 4 2 65 83 73 672 47 23 21 1
|
||||
1
tsukanova_irina_lab_2/data/file_2.txt
Normal file
@@ -0,0 +1 @@
|
||||
4 9 6 320 75 348 12 75 94 63 45 23 3
|
||||
1
tsukanova_irina_lab_2/data/file_3.txt
Normal file
@@ -0,0 +1 @@
|
||||
5 34 7 9 6 43 5 768 4 23 1 3 657 534 4 3 87 6 9 8 56 37 525 5 7 3 2 65 4 86 7 295 473 254 633 4 45 2
|
||||
18
tsukanova_irina_lab_2/docker-compose.yaml
Normal file
@@ -0,0 +1,18 @@
|
||||
services:
|
||||
|
||||
worker_one:
|
||||
container_name: worker_one
|
||||
build:
|
||||
dockerfile: ./worker-1
|
||||
volumes:
|
||||
- ./data:/var/data
|
||||
- ./result:/var/result
|
||||
|
||||
worker_two:
|
||||
container_name: worker_two
|
||||
build:
|
||||
dockerfile: ./worker-2
|
||||
volumes:
|
||||
- ./result:/var/result
|
||||
depends_on:
|
||||
- worker_one
|
||||
1
tsukanova_irina_lab_2/result/data.txt
Normal file
@@ -0,0 +1 @@
|
||||
5 34 7 9 6 43 5 768 4 23 1 3 657 534 4 3 87 6 9 8 56 37 525 5 7 3 2 65 4 86 7 295 473 254 633 4 45 2
|
||||
1
tsukanova_irina_lab_2/result/result.txt
Normal file
@@ -0,0 +1 @@
|
||||
10
|
||||
11
tsukanova_irina_lab_2/worker-1/Dockerfile
Normal file
@@ -0,0 +1,11 @@
|
||||
# Использую базовый образ Python
|
||||
FROM python:3.12-slim
|
||||
|
||||
# Устанавливаю рабочую директорию внутри контейнера
|
||||
WORKDIR /app
|
||||
|
||||
# Копирую все файлы в контейнер
|
||||
COPY worker_1.py .
|
||||
|
||||
# Команда для запуска Python-скрипта
|
||||
CMD ["python", "worker_1.py"]
|
||||
27
tsukanova_irina_lab_2/worker-1/worker_1.py
Normal file
@@ -0,0 +1,27 @@
|
||||
import os
|
||||
import shutil
|
||||
|
||||
# Ищет в каталоге /var/data самый большой по объёму файл и перекладывает его в /var/result/data.txt.
|
||||
print("start worker_1")
|
||||
dir_data = "/var/data"
|
||||
dir_res = "/var/result"
|
||||
|
||||
if not os.path.exists(dir_data):
|
||||
os.mkdir(dir_data)
|
||||
|
||||
if not os.path.exists(dir_res):
|
||||
os.mkdir(dir_res)
|
||||
|
||||
largest_file = None
|
||||
largest_size = 0
|
||||
|
||||
for root, dirs, files in os.walk(dir_data):
|
||||
for file in files:
|
||||
file_path = os.path.join(root, file)
|
||||
file_size = os.path.getsize(file_path)
|
||||
if file_size > largest_size:
|
||||
largest_size = file_size
|
||||
largest_file = file_path
|
||||
|
||||
if largest_file:
|
||||
shutil.copyfile(largest_file, dir_res + "/data.txt")
|
||||
11
tsukanova_irina_lab_2/worker-2/Dockerfile
Normal file
@@ -0,0 +1,11 @@
|
||||
# Использую базовый образ Python
|
||||
FROM python:3.12-slim
|
||||
|
||||
# Устанавливаю рабочую директорию внутри контейнера
|
||||
WORKDIR /app
|
||||
|
||||
# Копирую все файлы в контейнер
|
||||
COPY worker_2.py .
|
||||
|
||||
# Команда для запуска Python-скрипта
|
||||
CMD ["python", "worker_2.py"]
|
||||
19
tsukanova_irina_lab_2/worker-2/worker_2.py
Normal file
@@ -0,0 +1,19 @@
|
||||
# Сохраняет произведение первого и последнего числа из файла /var/result/data.txt в /var/result/result.txt.
|
||||
|
||||
print("start worker_2")
|
||||
|
||||
with open('/var/result/data.txt', 'r') as f:
|
||||
numbers = [int(num) for num in f.read().split() if num.isdigit()]
|
||||
|
||||
if numbers:
|
||||
first_number = numbers[0]
|
||||
last_number = numbers[-1]
|
||||
|
||||
result = first_number * last_number
|
||||
|
||||
with open('/var/result/result.txt', 'w') as f:
|
||||
f.write(f"{result}\n")
|
||||
|
||||
print(f"Получен результат - {result}")
|
||||
else:
|
||||
print("Результат не получен. Файл не содержит чисел")
|
||||
27
vaksman_valeria_lab_4/Consumer_1.py
Normal file
@@ -0,0 +1,27 @@
|
||||
import pika
|
||||
import time
|
||||
|
||||
def callback(ch, method, properties, body):
|
||||
print(f'Consumer 1 получил сообщение: {body.decode()}')
|
||||
|
||||
# Время задержки по условию
|
||||
time.sleep(2)
|
||||
|
||||
print('Consumer 1 закончил обработку')
|
||||
|
||||
def consume_events_1():
|
||||
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
# Создание очереди
|
||||
channel.queue_declare(queue='consumer1_queue')
|
||||
# Привязка очереди
|
||||
channel.queue_bind(exchange='beauty_salon_events', queue='consumer1_queue')
|
||||
|
||||
channel.basic_consume(queue='consumer1_queue', on_message_callback=callback, auto_ack=True)
|
||||
|
||||
print('Consumer 1 начал ожидать сообщения...')
|
||||
channel.start_consuming()
|
||||
|
||||
if __name__ == "__main__":
|
||||
consume_events_1()
|
||||
25
vaksman_valeria_lab_4/Consumer_2.py
Normal file
@@ -0,0 +1,25 @@
|
||||
import pika
|
||||
|
||||
def callback(ch, method, properties, body):
|
||||
print(f'Consumer 2 получил сообщение: {body.decode()}')
|
||||
|
||||
# Обработка "нон-стопом"
|
||||
print('Consumer 2 закончил обработку')
|
||||
|
||||
def consume_events_2():
|
||||
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
# Создание очереди
|
||||
channel.queue_declare(queue='consumer2_queue')
|
||||
|
||||
# Привязка очереди
|
||||
channel.queue_bind(exchange='beauty_salon_events', queue='consumer2_queue')
|
||||
|
||||
channel.basic_consume(queue='consumer2_queue', on_message_callback=callback, auto_ack=True)
|
||||
|
||||
print('Consumer 2 начал ожидать сообщения...')
|
||||
channel.start_consuming()
|
||||
|
||||
if __name__ == "__main__":
|
||||
consume_events_2()
|
||||
26
vaksman_valeria_lab_4/Publisher.py
Normal file
@@ -0,0 +1,26 @@
|
||||
import pika
|
||||
import time
|
||||
|
||||
def publish_events():
|
||||
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
# Создание exchange типа fanout
|
||||
channel.exchange_declare(exchange='beauty_salon_events', exchange_type='fanout')
|
||||
|
||||
events = [
|
||||
"Пришёл заказ на услуги",
|
||||
"Сообщение от пользователя",
|
||||
"Необходимо создать отчёт",
|
||||
"Запись на процедуру",
|
||||
"Пользователь отменил запись"
|
||||
]
|
||||
|
||||
while True:
|
||||
event = events[int(time.time()) % len(events)]
|
||||
channel.basic_publish(exchange='beauty_salon_events', routing_key='', body=event)
|
||||
print(f'Отправлено: {event}')
|
||||
time.sleep(1)
|
||||
|
||||
if __name__ == "__main__":
|
||||
publish_events()
|
||||
60
vaksman_valeria_lab_4/README.md
Normal file
@@ -0,0 +1,60 @@
|
||||
# Лабораторная работа №4 - Работа с брокером сообщений
|
||||
|
||||
## Задание
|
||||
|
||||
#### Цель:
|
||||
|
||||
Изучение проектирования приложений при помощи брокера сообщений.
|
||||
|
||||
#### Задачи:
|
||||
|
||||
* Установить брокер сообщений RabbitMQ.
|
||||
* Пройти уроки 1, 2 и 3 из RabbitMQ Tutorials на любом языке программирования.
|
||||
* Продемонстрировать работу брокера сообщений.
|
||||
|
||||
### Классы:
|
||||
1. ```Publisher``` - класс, отвечающий за отправку сообщений
|
||||
|
||||
2. ```Consumer1``` - класс, отвечающий за принятие и обработку сообщений за задержкой 3 секунды
|
||||
|
||||
2. ```Consumer2``` - класс, отвечающий за принятие и обработку сообщений без задержек
|
||||
|
||||
#### Ход работы:
|
||||
|
||||
На компьютер был установлен брокер сообщений ```RabbitMQ```, после чего все три класса программы были одновременно запущены.
|
||||
|
||||
## Работа программы:
|
||||
|
||||
Класс ```Publisher``` успешно осуществляет отправку сообщений своим клиентам.
|
||||
|
||||
Класс ```Consumer1``` осуществляет принятие и обработку сообщений с задержкой в 3 секунды, это можно заметить на видео.
|
||||
|
||||
Класс ```Consumer2``` мгновенно осуществляет принятие и обработку сообщений.
|
||||
|
||||
## Работа с RabbitMQ Management UI
|
||||
|
||||

|
||||
|
||||
### Очередь ```Consumer1```
|
||||
|
||||

|
||||
|
||||
### Очередь ```Consumer2```
|
||||
|
||||

|
||||

|
||||

|
||||
|
||||
#### Уроки
|
||||
|
||||
Урок 1:
|
||||

|
||||
|
||||
Урок 2:
|
||||

|
||||
|
||||
Урок 3:
|
||||

|
||||
|
||||
# ВК
|
||||
https://vk.com/video256017065_456239872
|
||||
BIN
vaksman_valeria_lab_4/Rabbit.png
Normal file
|
After Width: | Height: | Size: 65 KiB |
BIN
vaksman_valeria_lab_4/consumer1.png
Normal file
|
After Width: | Height: | Size: 20 KiB |
BIN
vaksman_valeria_lab_4/consumer2.png
Normal file
|
After Width: | Height: | Size: 18 KiB |
BIN
vaksman_valeria_lab_4/consumer2_2.png
Normal file
|
After Width: | Height: | Size: 17 KiB |
BIN
vaksman_valeria_lab_4/consumer3-1.png
Normal file
|
After Width: | Height: | Size: 16 KiB |
BIN
vaksman_valeria_lab_4/lesson1.png
Normal file
|
After Width: | Height: | Size: 66 KiB |
BIN
vaksman_valeria_lab_4/lesson2.png
Normal file
|
After Width: | Height: | Size: 57 KiB |
BIN
vaksman_valeria_lab_4/lesson3.png
Normal file
|
After Width: | Height: | Size: 104 KiB |
25
vaksman_valeria_lab_4/lesson_1/receive.py
Normal file
@@ -0,0 +1,25 @@
|
||||
import pika, sys, os
|
||||
|
||||
def main():
|
||||
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.queue_declare(queue='hello')
|
||||
|
||||
def callback(ch, method, properties, body):
|
||||
print(f" [x] Received {body}")
|
||||
|
||||
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
|
||||
|
||||
print(' [*] Waiting for messages. To exit press CTRL+C')
|
||||
channel.start_consuming()
|
||||
|
||||
if __name__ == '__main__':
|
||||
try:
|
||||
main()
|
||||
except KeyboardInterrupt:
|
||||
print('Interrupted')
|
||||
try:
|
||||
sys.exit(0)
|
||||
except SystemExit:
|
||||
os._exit(0)
|
||||
11
vaksman_valeria_lab_4/lesson_1/send.py
Normal file
@@ -0,0 +1,11 @@
|
||||
import pika
|
||||
|
||||
connection = pika.BlockingConnection(
|
||||
pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.queue_declare(queue='hello')
|
||||
|
||||
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
|
||||
print(" [x] Sent 'Hello World!'")
|
||||
connection.close()
|
||||
19
vaksman_valeria_lab_4/lesson_2/new_task.py
Normal file
@@ -0,0 +1,19 @@
|
||||
import pika
|
||||
import sys
|
||||
|
||||
connection = pika.BlockingConnection(
|
||||
pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.queue_declare(queue='task_queue', durable=True)
|
||||
|
||||
message = ' '.join(sys.argv[1:]) or "Hello World!"
|
||||
channel.basic_publish(
|
||||
exchange='',
|
||||
routing_key='task_queue',
|
||||
body=message,
|
||||
properties=pika.BasicProperties(
|
||||
delivery_mode=pika.DeliveryMode.Persistent
|
||||
))
|
||||
print(f" [x] Sent {message}")
|
||||
connection.close()
|
||||
23
vaksman_valeria_lab_4/lesson_2/worker.py
Normal file
@@ -0,0 +1,23 @@
|
||||
#!/usr/bin/env python
|
||||
import pika
|
||||
import time
|
||||
|
||||
connection = pika.BlockingConnection(
|
||||
pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.queue_declare(queue='task_queue', durable=True)
|
||||
print(' [*] Waiting for messages. To exit press CTRL+C')
|
||||
|
||||
|
||||
def callback(ch, method, properties, body):
|
||||
print(f" [x] Received {body.decode()}")
|
||||
time.sleep(body.count(b'.'))
|
||||
print(" [x] Done")
|
||||
ch.basic_ack(delivery_tag=method.delivery_tag)
|
||||
|
||||
|
||||
channel.basic_qos(prefetch_count=1)
|
||||
channel.basic_consume(queue='task_queue', on_message_callback=callback)
|
||||
|
||||
channel.start_consuming()
|
||||
13
vaksman_valeria_lab_4/lesson_3/emit_log.py
Normal file
@@ -0,0 +1,13 @@
|
||||
import pika
|
||||
import sys
|
||||
|
||||
connection = pika.BlockingConnection(
|
||||
pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.exchange_declare(exchange='logs', exchange_type='fanout')
|
||||
|
||||
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
|
||||
channel.basic_publish(exchange='logs', routing_key='', body=message)
|
||||
print(f" [x] Sent {message}")
|
||||
connection.close()
|
||||
22
vaksman_valeria_lab_4/lesson_3/receive_logs.py
Normal file
@@ -0,0 +1,22 @@
|
||||
import pika
|
||||
|
||||
connection = pika.BlockingConnection(
|
||||
pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.exchange_declare(exchange='logs', exchange_type='fanout')
|
||||
|
||||
result = channel.queue_declare(queue='', exclusive=True)
|
||||
queue_name = result.method.queue
|
||||
|
||||
channel.queue_bind(exchange='logs', queue=queue_name)
|
||||
|
||||
print(' [*] Waiting for logs. To exit press CTRL+C')
|
||||
|
||||
def callback(ch, method, properties, body):
|
||||
print(f" [x] {body}")
|
||||
|
||||
channel.basic_consume(
|
||||
queue=queue_name, on_message_callback=callback, auto_ack=True)
|
||||
|
||||
channel.start_consuming()
|
||||
@@ -0,0 +1,222 @@
|
||||
# don't import any costly modules
|
||||
import sys
|
||||
import os
|
||||
|
||||
|
||||
is_pypy = '__pypy__' in sys.builtin_module_names
|
||||
|
||||
|
||||
def warn_distutils_present():
|
||||
if 'distutils' not in sys.modules:
|
||||
return
|
||||
if is_pypy and sys.version_info < (3, 7):
|
||||
# PyPy for 3.6 unconditionally imports distutils, so bypass the warning
|
||||
# https://foss.heptapod.net/pypy/pypy/-/blob/be829135bc0d758997b3566062999ee8b23872b4/lib-python/3/site.py#L250
|
||||
return
|
||||
import warnings
|
||||
|
||||
warnings.warn(
|
||||
"Distutils was imported before Setuptools, but importing Setuptools "
|
||||
"also replaces the `distutils` module in `sys.modules`. This may lead "
|
||||
"to undesirable behaviors or errors. To avoid these issues, avoid "
|
||||
"using distutils directly, ensure that setuptools is installed in the "
|
||||
"traditional way (e.g. not an editable install), and/or make sure "
|
||||
"that setuptools is always imported before distutils."
|
||||
)
|
||||
|
||||
|
||||
def clear_distutils():
|
||||
if 'distutils' not in sys.modules:
|
||||
return
|
||||
import warnings
|
||||
|
||||
warnings.warn("Setuptools is replacing distutils.")
|
||||
mods = [
|
||||
name
|
||||
for name in sys.modules
|
||||
if name == "distutils" or name.startswith("distutils.")
|
||||
]
|
||||
for name in mods:
|
||||
del sys.modules[name]
|
||||
|
||||
|
||||
def enabled():
|
||||
"""
|
||||
Allow selection of distutils by environment variable.
|
||||
"""
|
||||
which = os.environ.get('SETUPTOOLS_USE_DISTUTILS', 'local')
|
||||
return which == 'local'
|
||||
|
||||
|
||||
def ensure_local_distutils():
|
||||
import importlib
|
||||
|
||||
clear_distutils()
|
||||
|
||||
# With the DistutilsMetaFinder in place,
|
||||
# perform an import to cause distutils to be
|
||||
# loaded from setuptools._distutils. Ref #2906.
|
||||
with shim():
|
||||
importlib.import_module('distutils')
|
||||
|
||||
# check that submodules load as expected
|
||||
core = importlib.import_module('distutils.core')
|
||||
assert '_distutils' in core.__file__, core.__file__
|
||||
assert 'setuptools._distutils.log' not in sys.modules
|
||||
|
||||
|
||||
def do_override():
|
||||
"""
|
||||
Ensure that the local copy of distutils is preferred over stdlib.
|
||||
|
||||
See https://github.com/pypa/setuptools/issues/417#issuecomment-392298401
|
||||
for more motivation.
|
||||
"""
|
||||
if enabled():
|
||||
warn_distutils_present()
|
||||
ensure_local_distutils()
|
||||
|
||||
|
||||
class _TrivialRe:
|
||||
def __init__(self, *patterns):
|
||||
self._patterns = patterns
|
||||
|
||||
def match(self, string):
|
||||
return all(pat in string for pat in self._patterns)
|
||||
|
||||
|
||||
class DistutilsMetaFinder:
|
||||
def find_spec(self, fullname, path, target=None):
|
||||
# optimization: only consider top level modules and those
|
||||
# found in the CPython test suite.
|
||||
if path is not None and not fullname.startswith('test.'):
|
||||
return
|
||||
|
||||
method_name = 'spec_for_{fullname}'.format(**locals())
|
||||
method = getattr(self, method_name, lambda: None)
|
||||
return method()
|
||||
|
||||
def spec_for_distutils(self):
|
||||
if self.is_cpython():
|
||||
return
|
||||
|
||||
import importlib
|
||||
import importlib.abc
|
||||
import importlib.util
|
||||
|
||||
try:
|
||||
mod = importlib.import_module('setuptools._distutils')
|
||||
except Exception:
|
||||
# There are a couple of cases where setuptools._distutils
|
||||
# may not be present:
|
||||
# - An older Setuptools without a local distutils is
|
||||
# taking precedence. Ref #2957.
|
||||
# - Path manipulation during sitecustomize removes
|
||||
# setuptools from the path but only after the hook
|
||||
# has been loaded. Ref #2980.
|
||||
# In either case, fall back to stdlib behavior.
|
||||
return
|
||||
|
||||
class DistutilsLoader(importlib.abc.Loader):
|
||||
def create_module(self, spec):
|
||||
mod.__name__ = 'distutils'
|
||||
return mod
|
||||
|
||||
def exec_module(self, module):
|
||||
pass
|
||||
|
||||
return importlib.util.spec_from_loader(
|
||||
'distutils', DistutilsLoader(), origin=mod.__file__
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def is_cpython():
|
||||
"""
|
||||
Suppress supplying distutils for CPython (build and tests).
|
||||
Ref #2965 and #3007.
|
||||
"""
|
||||
return os.path.isfile('pybuilddir.txt')
|
||||
|
||||
def spec_for_pip(self):
|
||||
"""
|
||||
Ensure stdlib distutils when running under pip.
|
||||
See pypa/pip#8761 for rationale.
|
||||
"""
|
||||
if self.pip_imported_during_build():
|
||||
return
|
||||
clear_distutils()
|
||||
self.spec_for_distutils = lambda: None
|
||||
|
||||
@classmethod
|
||||
def pip_imported_during_build(cls):
|
||||
"""
|
||||
Detect if pip is being imported in a build script. Ref #2355.
|
||||
"""
|
||||
import traceback
|
||||
|
||||
return any(
|
||||
cls.frame_file_is_setup(frame) for frame, line in traceback.walk_stack(None)
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def frame_file_is_setup(frame):
|
||||
"""
|
||||
Return True if the indicated frame suggests a setup.py file.
|
||||
"""
|
||||
# some frames may not have __file__ (#2940)
|
||||
return frame.f_globals.get('__file__', '').endswith('setup.py')
|
||||
|
||||
def spec_for_sensitive_tests(self):
|
||||
"""
|
||||
Ensure stdlib distutils when running select tests under CPython.
|
||||
|
||||
python/cpython#91169
|
||||
"""
|
||||
clear_distutils()
|
||||
self.spec_for_distutils = lambda: None
|
||||
|
||||
sensitive_tests = (
|
||||
[
|
||||
'test.test_distutils',
|
||||
'test.test_peg_generator',
|
||||
'test.test_importlib',
|
||||
]
|
||||
if sys.version_info < (3, 10)
|
||||
else [
|
||||
'test.test_distutils',
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
for name in DistutilsMetaFinder.sensitive_tests:
|
||||
setattr(
|
||||
DistutilsMetaFinder,
|
||||
f'spec_for_{name}',
|
||||
DistutilsMetaFinder.spec_for_sensitive_tests,
|
||||
)
|
||||
|
||||
|
||||
DISTUTILS_FINDER = DistutilsMetaFinder()
|
||||
|
||||
|
||||
def add_shim():
|
||||
DISTUTILS_FINDER in sys.meta_path or insert_shim()
|
||||
|
||||
|
||||
class shim:
|
||||
def __enter__(self):
|
||||
insert_shim()
|
||||
|
||||
def __exit__(self, exc, value, tb):
|
||||
remove_shim()
|
||||
|
||||
|
||||
def insert_shim():
|
||||
sys.meta_path.insert(0, DISTUTILS_FINDER)
|
||||
|
||||
|
||||
def remove_shim():
|
||||
try:
|
||||
sys.meta_path.remove(DISTUTILS_FINDER)
|
||||
except ValueError:
|
||||
pass
|
||||
@@ -0,0 +1 @@
|
||||
__import__('_distutils_hack').do_override()
|
||||
@@ -0,0 +1 @@
|
||||
import os; var = 'SETUPTOOLS_USE_DISTUTILS'; enabled = os.environ.get(var, 'local') == 'local'; enabled and __import__('_distutils_hack').add_shim();
|
||||
@@ -0,0 +1 @@
|
||||
pip
|
||||
@@ -0,0 +1,25 @@
|
||||
Copyright (c) 2009-2021, Tony Garnock-Jones, Gavin M. Roy, Pivotal Software, Inc and others.
|
||||
All rights reserved.
|
||||
|
||||
Redistribution and use in source and binary forms, with or without modification,
|
||||
are permitted provided that the following conditions are met:
|
||||
|
||||
* Redistributions of source code must retain the above copyright notice, this
|
||||
list of conditions and the following disclaimer.
|
||||
* Redistributions in binary form must reproduce the above copyright notice,
|
||||
this list of conditions and the following disclaimer in the documentation
|
||||
and/or other materials provided with the distribution.
|
||||
* Neither the name of the Pika project nor the names of its contributors may be used
|
||||
to endorse or promote products derived from this software without specific
|
||||
prior written permission.
|
||||
|
||||
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
|
||||
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
||||
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
|
||||
IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
|
||||
INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
|
||||
BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
||||
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
|
||||
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
|
||||
OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
|
||||
ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
@@ -0,0 +1,320 @@
|
||||
Metadata-Version: 2.1
|
||||
Name: pika
|
||||
Version: 1.3.2
|
||||
Summary: Pika Python AMQP Client Library
|
||||
Maintainer-email: "Gavin M. Roy" <gavinmroy@gmail.com>, Luke Bakken <lukerbakken@gmail.com>
|
||||
License: BSD-3-Clause
|
||||
Project-URL: Homepage, https://pika.readthedocs.io
|
||||
Project-URL: Source, https://github.com/pika/pika
|
||||
Classifier: Development Status :: 5 - Production/Stable
|
||||
Classifier: Intended Audience :: Developers
|
||||
Classifier: License :: OSI Approved :: BSD License
|
||||
Classifier: Natural Language :: English
|
||||
Classifier: Operating System :: OS Independent
|
||||
Classifier: Programming Language :: Python :: 3
|
||||
Classifier: Programming Language :: Python :: 3.7
|
||||
Classifier: Programming Language :: Python :: 3.8
|
||||
Classifier: Programming Language :: Python :: 3.9
|
||||
Classifier: Programming Language :: Python :: 3.10
|
||||
Classifier: Programming Language :: Python :: 3.11
|
||||
Classifier: Programming Language :: Python :: Implementation :: CPython
|
||||
Classifier: Programming Language :: Python :: Implementation :: Jython
|
||||
Classifier: Programming Language :: Python :: Implementation :: PyPy
|
||||
Classifier: Topic :: Communications
|
||||
Classifier: Topic :: Internet
|
||||
Classifier: Topic :: Software Development :: Libraries
|
||||
Classifier: Topic :: Software Development :: Libraries :: Python Modules
|
||||
Classifier: Topic :: System :: Networking
|
||||
Requires-Python: >=3.7
|
||||
Description-Content-Type: text/x-rst
|
||||
License-File: LICENSE
|
||||
Provides-Extra: gevent
|
||||
Requires-Dist: gevent ; extra == 'gevent'
|
||||
Provides-Extra: tornado
|
||||
Requires-Dist: tornado ; extra == 'tornado'
|
||||
Provides-Extra: twisted
|
||||
Requires-Dist: twisted ; extra == 'twisted'
|
||||
|
||||
Pika
|
||||
====
|
||||
Pika is a RabbitMQ (AMQP 0-9-1) client library for Python.
|
||||
|
||||
|Version| |Python versions| |Actions Status| |Coverage| |License| |Docs|
|
||||
|
||||
Introduction
|
||||
------------
|
||||
Pika is a pure-Python implementation of the AMQP 0-9-1 protocol including
|
||||
RabbitMQ's extensions.
|
||||
|
||||
- Supports Python 3.7+ (`1.1.0` was the last version to support 2.7)
|
||||
- Since threads aren't appropriate to every situation, it doesn't require
|
||||
threads. Pika core takes care not to forbid them, either. The same goes for
|
||||
greenlets, callbacks, continuations, and generators. An instance of Pika's
|
||||
built-in connection adapters isn't thread-safe, however.
|
||||
- People may be using direct sockets, plain old ``select()``, or any of the
|
||||
wide variety of ways of getting network events to and from a Python
|
||||
application. Pika tries to stay compatible with all of these, and to make
|
||||
adapting it to a new environment as simple as possible.
|
||||
|
||||
Documentation
|
||||
-------------
|
||||
Pika's documentation can be found at https://pika.readthedocs.io.
|
||||
|
||||
Example
|
||||
-------
|
||||
Here is the most simple example of use, sending a message with the
|
||||
``pika.BlockingConnection`` adapter:
|
||||
|
||||
.. code :: python
|
||||
|
||||
import pika
|
||||
|
||||
connection = pika.BlockingConnection()
|
||||
channel = connection.channel()
|
||||
channel.basic_publish(exchange='test', routing_key='test',
|
||||
body=b'Test message.')
|
||||
connection.close()
|
||||
|
||||
And an example of writing a blocking consumer:
|
||||
|
||||
.. code :: python
|
||||
|
||||
import pika
|
||||
|
||||
connection = pika.BlockingConnection()
|
||||
channel = connection.channel()
|
||||
|
||||
for method_frame, properties, body in channel.consume('test'):
|
||||
# Display the message parts and acknowledge the message
|
||||
print(method_frame, properties, body)
|
||||
channel.basic_ack(method_frame.delivery_tag)
|
||||
|
||||
# Escape out of the loop after 10 messages
|
||||
if method_frame.delivery_tag == 10:
|
||||
break
|
||||
|
||||
# Cancel the consumer and return any pending messages
|
||||
requeued_messages = channel.cancel()
|
||||
print('Requeued %i messages' % requeued_messages)
|
||||
connection.close()
|
||||
|
||||
Pika provides the following adapters
|
||||
------------------------------------
|
||||
|
||||
- ``pika.adapters.asyncio_connection.AsyncioConnection`` - asynchronous adapter
|
||||
for Python 3 `AsyncIO <https://docs.python.org/3/library/asyncio.html>`_'s
|
||||
I/O loop.
|
||||
- ``pika.BlockingConnection`` - synchronous adapter on top of library for
|
||||
simple usage.
|
||||
- ``pika.SelectConnection`` - asynchronous adapter without third-party
|
||||
dependencies.
|
||||
- ``pika.adapters.gevent_connection.GeventConnection`` - asynchronous adapter
|
||||
for use with `Gevent <http://www.gevent.org>`_'s I/O loop.
|
||||
- ``pika.adapters.tornado_connection.TornadoConnection`` - asynchronous adapter
|
||||
for use with `Tornado <http://tornadoweb.org>`_'s I/O loop.
|
||||
- ``pika.adapters.twisted_connection.TwistedProtocolConnection`` - asynchronous
|
||||
adapter for use with `Twisted <http://twistedmatrix.com>`_'s I/O loop.
|
||||
|
||||
Multiple connection parameters
|
||||
------------------------------
|
||||
You can also pass multiple ``pika.ConnectionParameters`` instances for
|
||||
fault-tolerance as in the code snippet below (host names are just examples, of
|
||||
course). To enable retries, set ``connection_attempts`` and ``retry_delay`` as
|
||||
needed in the last ``pika.ConnectionParameters`` element of the sequence.
|
||||
Retries occur after connection attempts using all of the given connection
|
||||
parameters fail.
|
||||
|
||||
.. code :: python
|
||||
|
||||
import pika
|
||||
|
||||
parameters = (
|
||||
pika.ConnectionParameters(host='rabbitmq.zone1.yourdomain.com'),
|
||||
pika.ConnectionParameters(host='rabbitmq.zone2.yourdomain.com',
|
||||
connection_attempts=5, retry_delay=1))
|
||||
connection = pika.BlockingConnection(parameters)
|
||||
|
||||
With non-blocking adapters, such as ``pika.SelectConnection`` and
|
||||
``pika.adapters.asyncio_connection.AsyncioConnection``, you can request a
|
||||
connection using multiple connection parameter instances via the connection
|
||||
adapter's ``create_connection()`` class method.
|
||||
|
||||
Requesting message acknowledgements from another thread
|
||||
-------------------------------------------------------
|
||||
The single-threaded usage constraint of an individual Pika connection adapter
|
||||
instance may result in a dropped AMQP/stream connection due to AMQP heartbeat
|
||||
timeout in consumers that take a long time to process an incoming message. A
|
||||
common solution is to delegate processing of the incoming messages to another
|
||||
thread, while the connection adapter's thread continues to service its I/O
|
||||
loop's message pump, permitting AMQP heartbeats and other I/O to be serviced in
|
||||
a timely fashion.
|
||||
|
||||
Messages processed in another thread may not be acknowledged directly from that
|
||||
thread, since all accesses to the connection adapter instance must be from a
|
||||
single thread, which is the thread running the adapter's I/O loop. This is
|
||||
accomplished by requesting a callback to be executed in the adapter's
|
||||
I/O loop thread. For example, the callback function's implementation might look
|
||||
like this:
|
||||
|
||||
.. code :: python
|
||||
|
||||
def ack_message(channel, delivery_tag):
|
||||
"""Note that `channel` must be the same Pika channel instance via which
|
||||
the message being acknowledged was retrieved (AMQP protocol constraint).
|
||||
"""
|
||||
if channel.is_open:
|
||||
channel.basic_ack(delivery_tag)
|
||||
else:
|
||||
# Channel is already closed, so we can't acknowledge this message;
|
||||
# log and/or do something that makes sense for your app in this case.
|
||||
pass
|
||||
|
||||
The code running in the other thread may request the ``ack_message()`` function
|
||||
to be executed in the connection adapter's I/O loop thread using an
|
||||
adapter-specific mechanism:
|
||||
|
||||
- ``pika.BlockingConnection`` abstracts its I/O loop from the application and
|
||||
thus exposes ``pika.BlockingConnection.add_callback_threadsafe()``. Refer to
|
||||
this method's docstring for additional information. For example:
|
||||
|
||||
.. code :: python
|
||||
|
||||
connection.add_callback_threadsafe(functools.partial(ack_message, channel, delivery_tag))
|
||||
|
||||
- When using a non-blocking connection adapter, such as
|
||||
``pika.adapters.asyncio_connection.AsyncioConnection`` or
|
||||
``pika.SelectConnection``, you use the underlying asynchronous framework's
|
||||
native API for requesting an I/O loop-bound callback from another thread. For
|
||||
example, ``pika.SelectConnection``'s I/O loop provides
|
||||
``add_callback_threadsafe()``,
|
||||
``pika.adapters.tornado_connection.TornadoConnection``'s I/O loop has
|
||||
``add_callback()``, while
|
||||
``pika.adapters.asyncio_connection.AsyncioConnection``'s I/O loop exposes
|
||||
``call_soon_threadsafe()``.
|
||||
|
||||
This threadsafe callback request mechanism may also be used to delegate
|
||||
publishing of messages, etc., from a background thread to the connection
|
||||
adapter's thread.
|
||||
|
||||
Connection recovery
|
||||
-------------------
|
||||
|
||||
Some RabbitMQ clients (Bunny, Java, .NET, Objective-C, Swift) provide a way to
|
||||
automatically recover a connection, its channels and topology (e.g. queues,
|
||||
bindings and consumers) after a network failure. Others require connection
|
||||
recovery to be performed by the application code and strive to make it a
|
||||
straightforward process. Pika falls into the second category.
|
||||
|
||||
Pika supports multiple connection adapters. They take different approaches to
|
||||
connection recovery.
|
||||
|
||||
For ``pika.BlockingConnection`` adapter exception handling can be used to check
|
||||
for connection errors. Here is a very basic example:
|
||||
|
||||
.. code :: python
|
||||
|
||||
import pika
|
||||
|
||||
while True:
|
||||
try:
|
||||
connection = pika.BlockingConnection()
|
||||
channel = connection.channel()
|
||||
channel.basic_consume('test', on_message_callback)
|
||||
channel.start_consuming()
|
||||
# Don't recover if connection was closed by broker
|
||||
except pika.exceptions.ConnectionClosedByBroker:
|
||||
break
|
||||
# Don't recover on channel errors
|
||||
except pika.exceptions.AMQPChannelError:
|
||||
break
|
||||
# Recover on all other connection errors
|
||||
except pika.exceptions.AMQPConnectionError:
|
||||
continue
|
||||
|
||||
This example can be found in `examples/consume_recover.py`.
|
||||
|
||||
Generic operation retry libraries such as
|
||||
`retry <https://github.com/invl/retry>`_ can be used. Decorators make it
|
||||
possible to configure some additional recovery behaviours, like delays between
|
||||
retries and limiting the number of retries:
|
||||
|
||||
.. code :: python
|
||||
|
||||
from retry import retry
|
||||
|
||||
|
||||
@retry(pika.exceptions.AMQPConnectionError, delay=5, jitter=(1, 3))
|
||||
def consume():
|
||||
connection = pika.BlockingConnection()
|
||||
channel = connection.channel()
|
||||
channel.basic_consume('test', on_message_callback)
|
||||
|
||||
try:
|
||||
channel.start_consuming()
|
||||
# Don't recover connections closed by server
|
||||
except pika.exceptions.ConnectionClosedByBroker:
|
||||
pass
|
||||
|
||||
|
||||
consume()
|
||||
|
||||
This example can be found in `examples/consume_recover_retry.py`.
|
||||
|
||||
For asynchronous adapters, use ``on_close_callback`` to react to connection
|
||||
failure events. This callback can be used to clean up and recover the
|
||||
connection.
|
||||
|
||||
An example of recovery using ``on_close_callback`` can be found in
|
||||
`examples/asynchronous_consumer_example.py`.
|
||||
|
||||
Contributing
|
||||
------------
|
||||
To contribute to Pika, please make sure that any new features or changes to
|
||||
existing functionality **include test coverage**.
|
||||
|
||||
*Pull requests that add or change code without adequate test coverage will be
|
||||
rejected.*
|
||||
|
||||
Additionally, please format your code using
|
||||
`Yapf <http://pypi.python.org/pypi/yapf>`_ with ``google`` style prior to
|
||||
issuing your pull request. *Note: only format those lines that you have changed
|
||||
in your pull request. If you format an entire file and change code outside of
|
||||
the scope of your PR, it will likely be rejected.*
|
||||
|
||||
Extending to support additional I/O frameworks
|
||||
----------------------------------------------
|
||||
New non-blocking adapters may be implemented in either of the following ways:
|
||||
|
||||
- By subclassing ``pika.BaseConnection``, implementing its abstract method and
|
||||
passing its constructor an implementation of
|
||||
``pika.adapters.utils.nbio_interface.AbstractIOServices``.
|
||||
``pika.BaseConnection`` implements ``pika.connection.Connection``'s abstract
|
||||
methods, including internally-initiated connection logic. For examples, refer
|
||||
to the implementations of
|
||||
``pika.adapters.asyncio_connection.AsyncioConnection``,
|
||||
``pika.adapters.gevent_connection.GeventConnection`` and
|
||||
``pika.adapters.tornado_connection.TornadoConnection``.
|
||||
- By subclassing ``pika.connection.Connection`` and implementing its abstract
|
||||
methods. This approach facilitates implementation of custom
|
||||
connection-establishment and transport mechanisms. For an example, refer to
|
||||
the implementation of
|
||||
``pika.adapters.twisted_connection.TwistedProtocolConnection``.
|
||||
|
||||
.. |Version| image:: https://img.shields.io/pypi/v/pika.svg?
|
||||
:target: http://badge.fury.io/py/pika
|
||||
|
||||
.. |Python versions| image:: https://img.shields.io/pypi/pyversions/pika.svg
|
||||
:target: https://pypi.python.org/pypi/pika
|
||||
|
||||
.. |Actions Status| image:: https://github.com/pika/pika/actions/workflows/main.yaml/badge.svg
|
||||
:target: https://github.com/pika/pika/actions/workflows/main.yaml
|
||||
|
||||
.. |Coverage| image:: https://img.shields.io/codecov/c/github/pika/pika.svg?
|
||||
:target: https://codecov.io/github/pika/pika?branch=main
|
||||
|
||||
.. |License| image:: https://img.shields.io/pypi/l/pika.svg?
|
||||
:target: https://pika.readthedocs.io
|
||||
|
||||
.. |Docs| image:: https://readthedocs.org/projects/pika/badge/?version=stable
|
||||
:target: https://pika.readthedocs.io
|
||||
:alt: Documentation Status
|
||||
@@ -0,0 +1,68 @@
|
||||
pika-1.3.2.dist-info/INSTALLER,sha256=zuuue4knoyJ-UwPPXg8fezS7VCrXJQrAP7zeNuwvFQg,4
|
||||
pika-1.3.2.dist-info/LICENSE,sha256=21HeRCNF3Ho285qYfFcVgx8Kfy0lg65jKQFNDDIvYrk,1552
|
||||
pika-1.3.2.dist-info/METADATA,sha256=VHmz3pPAkWR45GIHPzGR3dswemh0G--XHdeB_xsqXZ4,13052
|
||||
pika-1.3.2.dist-info/RECORD,,
|
||||
pika-1.3.2.dist-info/REQUESTED,sha256=47DEQpj8HBSa-_TImW-5JCeuQeRkm5NMpJWZG3hSuFU,0
|
||||
pika-1.3.2.dist-info/WHEEL,sha256=pkctZYzUS4AYVn6dJ-7367OJZivF2e8RA9b_ZBjif18,92
|
||||
pika-1.3.2.dist-info/top_level.txt,sha256=ATSgSqw16wMD-Dht8AJlcJobg1xKjOwpbPEvKZJqfzg,5
|
||||
pika-1.3.2.dist-info/zip-safe,sha256=AbpHGcgLb-kRsJGnwFEktk7uzpZOCcBY74-YBdrKVGs,1
|
||||
pika/__init__.py,sha256=fGTPYlzKxqsnr6aYkMTkp1GVLl71BXcXtCY7QLSyTvI,693
|
||||
pika/__pycache__/__init__.cpython-311.pyc,,
|
||||
pika/__pycache__/amqp_object.cpython-311.pyc,,
|
||||
pika/__pycache__/callback.cpython-311.pyc,,
|
||||
pika/__pycache__/channel.cpython-311.pyc,,
|
||||
pika/__pycache__/compat.cpython-311.pyc,,
|
||||
pika/__pycache__/connection.cpython-311.pyc,,
|
||||
pika/__pycache__/credentials.cpython-311.pyc,,
|
||||
pika/__pycache__/data.cpython-311.pyc,,
|
||||
pika/__pycache__/delivery_mode.cpython-311.pyc,,
|
||||
pika/__pycache__/diagnostic_utils.cpython-311.pyc,,
|
||||
pika/__pycache__/exceptions.cpython-311.pyc,,
|
||||
pika/__pycache__/exchange_type.cpython-311.pyc,,
|
||||
pika/__pycache__/frame.cpython-311.pyc,,
|
||||
pika/__pycache__/heartbeat.cpython-311.pyc,,
|
||||
pika/__pycache__/spec.cpython-311.pyc,,
|
||||
pika/__pycache__/tcp_socket_opts.cpython-311.pyc,,
|
||||
pika/__pycache__/validators.cpython-311.pyc,,
|
||||
pika/adapters/__init__.py,sha256=GTUPWmxBgTEuZp0zrOqX9GX5gIwZfv3nTWi1uXwdBUU,994
|
||||
pika/adapters/__pycache__/__init__.cpython-311.pyc,,
|
||||
pika/adapters/__pycache__/asyncio_connection.cpython-311.pyc,,
|
||||
pika/adapters/__pycache__/base_connection.cpython-311.pyc,,
|
||||
pika/adapters/__pycache__/blocking_connection.cpython-311.pyc,,
|
||||
pika/adapters/__pycache__/gevent_connection.cpython-311.pyc,,
|
||||
pika/adapters/__pycache__/select_connection.cpython-311.pyc,,
|
||||
pika/adapters/__pycache__/tornado_connection.cpython-311.pyc,,
|
||||
pika/adapters/__pycache__/twisted_connection.cpython-311.pyc,,
|
||||
pika/adapters/asyncio_connection.py,sha256=z9SmKmFCft_PPq-mD_AS_PsOijMHwH2mAnMVsTEmkVY,9235
|
||||
pika/adapters/base_connection.py,sha256=5QpA2q1qlV8sHSMHITVb6hDCbn3ErXQOc23MQi66oOQ,20525
|
||||
pika/adapters/blocking_connection.py,sha256=gtXhod6U649z5WvnpWfdUoi9qZtkbknT1DioYt6Gkmw,110248
|
||||
pika/adapters/gevent_connection.py,sha256=RM-Ol6PcFyrkBqFx7tOVOhXQGZKqHGgOSuLPO7fgqdA,16735
|
||||
pika/adapters/select_connection.py,sha256=Jp4JfqhptZ955apFhymJzM2oK7P8O9D7hKf0fTZ9xC0,45162
|
||||
pika/adapters/tornado_connection.py,sha256=q5QwUBkC_pSq8kr9R3TorInpXINM9sbFHYJsj2-4rjM,3559
|
||||
pika/adapters/twisted_connection.py,sha256=hACt_5Lxs5F0nsbe5qG-h9lrKEPfL_VGnjUhonet4Is,50294
|
||||
pika/adapters/utils/__init__.py,sha256=47DEQpj8HBSa-_TImW-5JCeuQeRkm5NMpJWZG3hSuFU,0
|
||||
pika/adapters/utils/__pycache__/__init__.cpython-311.pyc,,
|
||||
pika/adapters/utils/__pycache__/connection_workflow.cpython-311.pyc,,
|
||||
pika/adapters/utils/__pycache__/io_services_utils.cpython-311.pyc,,
|
||||
pika/adapters/utils/__pycache__/nbio_interface.cpython-311.pyc,,
|
||||
pika/adapters/utils/__pycache__/selector_ioloop_adapter.cpython-311.pyc,,
|
||||
pika/adapters/utils/connection_workflow.py,sha256=8qDbckjKXHv9WmymheKr6pm1_KQm9cKSf11ipSW_ydE,33845
|
||||
pika/adapters/utils/io_services_utils.py,sha256=fCYJFW7t3cN3aYW9GuUBnMHf0Aq-UlytnOulufer2sg,53574
|
||||
pika/adapters/utils/nbio_interface.py,sha256=fiz6CMh5FvrV0_cjO1DsrpA4YINRYhWIdVp67SZLh3g,17217
|
||||
pika/adapters/utils/selector_ioloop_adapter.py,sha256=F55JelsVb3agQ7z-1V8I3fo1JxTxhXy-0J53KshqYVI,19955
|
||||
pika/amqp_object.py,sha256=-7u2LCPy-DQODB30Jt7WfxsINuMXmSu5BjqITCzWMbk,1830
|
||||
pika/callback.py,sha256=l_9kQBW5Ta4f1tPuVUaddjfFqwu1uPyoBPDXSP4-KLw,14743
|
||||
pika/channel.py,sha256=ynFkBA7JY5Axvi5uCGhhKmfU4IjbngO0SRhoBF6NMAw,63112
|
||||
pika/compat.py,sha256=f_zeLXYgzwj8zFh6ZjUa7b-vEpm7c7hK_4bMYrfH28E,7221
|
||||
pika/connection.py,sha256=14L1q-u1Jc1t_HG062E5TWRSdHgTui_tmrOAzcoFhGw,87627
|
||||
pika/credentials.py,sha256=1EIMKuFrWLqCWitRFiqLRCx25HTYid-tnQGH-4gJ4Yg,4646
|
||||
pika/data.py,sha256=SxYCuololigT9w3tLvZY3AJtBcSb0UJoxXoQlvZ7Ong,9956
|
||||
pika/delivery_mode.py,sha256=nMjT1H7REUFhxPtvqsovQy03AemnhKLcmlq3UQ0NeSI,87
|
||||
pika/diagnostic_utils.py,sha256=nxYnlMJejjoDtfUx_3QBgy1y-jwy1bghEYfejphakuY,1759
|
||||
pika/exceptions.py,sha256=CjT1lYgvUe79zGOAmvTGsylTUKGz_YCdnzL7FDYyhqw,10117
|
||||
pika/exchange_type.py,sha256=afA-c21QmVNuuUHKoHYYvrIg6oUOXThd4lpPsDz1ilE,390
|
||||
pika/frame.py,sha256=0IhYLWvEeRvHhB4K-z3jF6Dv_FoxskzNuAXhq9LFpuo,7744
|
||||
pika/heartbeat.py,sha256=hoORwZHRZsi3Im6o7y05H86dTiO9-XAK4s28wsjjiK8,8170
|
||||
pika/spec.py,sha256=2pQLO4hFOu0QNRA1ry6kGpqJ6yQRAZviMoOrmrb4CYQ,80448
|
||||
pika/tcp_socket_opts.py,sha256=QPU762BQKqY94RohuG57fz0EZ5CyabezcJxySouckAs,1513
|
||||
pika/validators.py,sha256=iLw8dwbnNaEK3yd_TU-BplIpQ4jx4SMYSmTR-HikNc0,1419
|
||||
@@ -0,0 +1,5 @@
|
||||
Wheel-Version: 1.0
|
||||
Generator: bdist_wheel (0.40.0)
|
||||
Root-Is-Purelib: true
|
||||
Tag: py3-none-any
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
pika
|
||||
@@ -0,0 +1 @@
|
||||
|
||||
@@ -0,0 +1,22 @@
|
||||
__version__ = '1.3.2'
|
||||
|
||||
import logging
|
||||
|
||||
# Add NullHandler before importing Pika modules to prevent logging warnings
|
||||
logging.getLogger(__name__).addHandler(logging.NullHandler())
|
||||
|
||||
# pylint: disable=C0413
|
||||
|
||||
from pika.connection import ConnectionParameters
|
||||
from pika.connection import URLParameters
|
||||
from pika.connection import SSLOptions
|
||||
from pika.credentials import PlainCredentials
|
||||
from pika.spec import BasicProperties
|
||||
from pika.delivery_mode import DeliveryMode
|
||||
|
||||
from pika import adapters
|
||||
from pika.adapters import BaseConnection
|
||||
from pika.adapters import BlockingConnection
|
||||
from pika.adapters import SelectConnection
|
||||
|
||||
from pika.adapters.utils.connection_workflow import AMQPConnectionWorkflow
|
||||
@@ -0,0 +1,23 @@
|
||||
"""
|
||||
Connection Adapters
|
||||
===================
|
||||
|
||||
Pika provides multiple adapters to connect to RabbitMQ:
|
||||
|
||||
- adapters.asyncio_connection.AsyncioConnection: Native Python3 AsyncIO use
|
||||
- adapters.blocking_connection.BlockingConnection: Enables blocking,
|
||||
synchronous operation on top of library for simple uses.
|
||||
- adapters.gevent_connection.GeventConnection: Connection adapter for use with
|
||||
Gevent.
|
||||
- adapters.select_connection.SelectConnection: A native event based connection
|
||||
adapter that implements select, kqueue, poll and epoll.
|
||||
- adapters.tornado_connection.TornadoConnection: Connection adapter for use
|
||||
with the Tornado web framework.
|
||||
- adapters.twisted_connection.TwistedProtocolConnection: Connection adapter for
|
||||
use with the Twisted framework
|
||||
|
||||
"""
|
||||
from pika.adapters.base_connection import BaseConnection
|
||||
from pika.adapters.blocking_connection import BlockingConnection
|
||||
from pika.adapters.select_connection import SelectConnection
|
||||
from pika.adapters.select_connection import IOLoop
|
||||
@@ -0,0 +1,283 @@
|
||||
"""Use pika with the Asyncio EventLoop"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import sys
|
||||
|
||||
from pika.adapters import base_connection
|
||||
from pika.adapters.utils import nbio_interface, io_services_utils
|
||||
|
||||
LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
if sys.platform == 'win32':
|
||||
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
|
||||
|
||||
|
||||
class AsyncioConnection(base_connection.BaseConnection):
|
||||
""" The AsyncioConnection runs on the Asyncio EventLoop.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self,
|
||||
parameters=None,
|
||||
on_open_callback=None,
|
||||
on_open_error_callback=None,
|
||||
on_close_callback=None,
|
||||
custom_ioloop=None,
|
||||
internal_connection_workflow=True):
|
||||
""" Create a new instance of the AsyncioConnection class, connecting
|
||||
to RabbitMQ automatically
|
||||
|
||||
:param pika.connection.Parameters parameters: Connection parameters
|
||||
:param callable on_open_callback: The method to call when the connection
|
||||
is open
|
||||
:param None | method on_open_error_callback: Called if the connection
|
||||
can't be established or connection establishment is interrupted by
|
||||
`Connection.close()`: on_open_error_callback(Connection, exception).
|
||||
:param None | method on_close_callback: Called when a previously fully
|
||||
open connection is closed:
|
||||
`on_close_callback(Connection, exception)`, where `exception` is
|
||||
either an instance of `exceptions.ConnectionClosed` if closed by
|
||||
user or broker or exception of another type that describes the cause
|
||||
of connection failure.
|
||||
:param None | asyncio.AbstractEventLoop |
|
||||
nbio_interface.AbstractIOServices custom_ioloop:
|
||||
Defaults to asyncio.get_event_loop().
|
||||
:param bool internal_connection_workflow: True for autonomous connection
|
||||
establishment which is default; False for externally-managed
|
||||
connection workflow via the `create_connection()` factory.
|
||||
|
||||
"""
|
||||
if isinstance(custom_ioloop, nbio_interface.AbstractIOServices):
|
||||
nbio = custom_ioloop
|
||||
else:
|
||||
nbio = _AsyncioIOServicesAdapter(custom_ioloop)
|
||||
|
||||
super().__init__(
|
||||
parameters,
|
||||
on_open_callback,
|
||||
on_open_error_callback,
|
||||
on_close_callback,
|
||||
nbio,
|
||||
internal_connection_workflow=internal_connection_workflow)
|
||||
|
||||
@classmethod
|
||||
def create_connection(cls,
|
||||
connection_configs,
|
||||
on_done,
|
||||
custom_ioloop=None,
|
||||
workflow=None):
|
||||
"""Implement
|
||||
:py:classmethod::`pika.adapters.BaseConnection.create_connection()`.
|
||||
|
||||
"""
|
||||
nbio = _AsyncioIOServicesAdapter(custom_ioloop)
|
||||
|
||||
def connection_factory(params):
|
||||
"""Connection factory."""
|
||||
if params is None:
|
||||
raise ValueError('Expected pika.connection.Parameters '
|
||||
'instance, but got None in params arg.')
|
||||
return cls(
|
||||
parameters=params,
|
||||
custom_ioloop=nbio,
|
||||
internal_connection_workflow=False)
|
||||
|
||||
return cls._start_connection_workflow(
|
||||
connection_configs=connection_configs,
|
||||
connection_factory=connection_factory,
|
||||
nbio=nbio,
|
||||
workflow=workflow,
|
||||
on_done=on_done)
|
||||
|
||||
|
||||
class _AsyncioIOServicesAdapter(io_services_utils.SocketConnectionMixin,
|
||||
io_services_utils.StreamingConnectionMixin,
|
||||
nbio_interface.AbstractIOServices,
|
||||
nbio_interface.AbstractFileDescriptorServices):
|
||||
"""Implements
|
||||
:py:class:`.utils.nbio_interface.AbstractIOServices` interface
|
||||
on top of `asyncio`.
|
||||
|
||||
NOTE:
|
||||
:py:class:`.utils.nbio_interface.AbstractFileDescriptorServices`
|
||||
interface is only required by the mixins.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, loop=None):
|
||||
"""
|
||||
:param asyncio.AbstractEventLoop | None loop: If None, gets default
|
||||
event loop from asyncio.
|
||||
|
||||
"""
|
||||
self._loop = loop or asyncio.get_event_loop()
|
||||
|
||||
def get_native_ioloop(self):
|
||||
"""Implement
|
||||
:py:meth:`.utils.nbio_interface.AbstractIOServices.get_native_ioloop()`.
|
||||
|
||||
"""
|
||||
return self._loop
|
||||
|
||||
def close(self):
|
||||
"""Implement
|
||||
:py:meth:`.utils.nbio_interface.AbstractIOServices.close()`.
|
||||
|
||||
"""
|
||||
self._loop.close()
|
||||
|
||||
def run(self):
|
||||
"""Implement :py:meth:`.utils.nbio_interface.AbstractIOServices.run()`.
|
||||
|
||||
"""
|
||||
self._loop.run_forever()
|
||||
|
||||
def stop(self):
|
||||
"""Implement :py:meth:`.utils.nbio_interface.AbstractIOServices.stop()`.
|
||||
|
||||
"""
|
||||
self._loop.stop()
|
||||
|
||||
def add_callback_threadsafe(self, callback):
|
||||
"""Implement
|
||||
:py:meth:`.utils.nbio_interface.AbstractIOServices.add_callback_threadsafe()`.
|
||||
|
||||
"""
|
||||
self._loop.call_soon_threadsafe(callback)
|
||||
|
||||
def call_later(self, delay, callback):
|
||||
"""Implement
|
||||
:py:meth:`.utils.nbio_interface.AbstractIOServices.call_later()`.
|
||||
|
||||
"""
|
||||
return _TimerHandle(self._loop.call_later(delay, callback))
|
||||
|
||||
def getaddrinfo(self,
|
||||
host,
|
||||
port,
|
||||
on_done,
|
||||
family=0,
|
||||
socktype=0,
|
||||
proto=0,
|
||||
flags=0):
|
||||
"""Implement
|
||||
:py:meth:`.utils.nbio_interface.AbstractIOServices.getaddrinfo()`.
|
||||
|
||||
"""
|
||||
return self._schedule_and_wrap_in_io_ref(
|
||||
self._loop.getaddrinfo(
|
||||
host,
|
||||
port,
|
||||
family=family,
|
||||
type=socktype,
|
||||
proto=proto,
|
||||
flags=flags), on_done)
|
||||
|
||||
def set_reader(self, fd, on_readable):
|
||||
"""Implement
|
||||
:py:meth:`.utils.nbio_interface.AbstractFileDescriptorServices.set_reader()`.
|
||||
|
||||
"""
|
||||
self._loop.add_reader(fd, on_readable)
|
||||
LOGGER.debug('set_reader(%s, _)', fd)
|
||||
|
||||
def remove_reader(self, fd):
|
||||
"""Implement
|
||||
:py:meth:`.utils.nbio_interface.AbstractFileDescriptorServices.remove_reader()`.
|
||||
|
||||
"""
|
||||
LOGGER.debug('remove_reader(%s)', fd)
|
||||
return self._loop.remove_reader(fd)
|
||||
|
||||
def set_writer(self, fd, on_writable):
|
||||
"""Implement
|
||||
:py:meth:`.utils.nbio_interface.AbstractFileDescriptorServices.set_writer()`.
|
||||
|
||||
"""
|
||||
self._loop.add_writer(fd, on_writable)
|
||||
LOGGER.debug('set_writer(%s, _)', fd)
|
||||
|
||||
def remove_writer(self, fd):
|
||||
"""Implement
|
||||
:py:meth:`.utils.nbio_interface.AbstractFileDescriptorServices.remove_writer()`.
|
||||
|
||||
"""
|
||||
LOGGER.debug('remove_writer(%s)', fd)
|
||||
return self._loop.remove_writer(fd)
|
||||
|
||||
def _schedule_and_wrap_in_io_ref(self, coro, on_done):
|
||||
"""Schedule the coroutine to run and return _AsyncioIOReference
|
||||
|
||||
:param coroutine-obj coro:
|
||||
:param callable on_done: user callback that takes the completion result
|
||||
or exception as its only arg. It will not be called if the operation
|
||||
was cancelled.
|
||||
:rtype: _AsyncioIOReference which is derived from
|
||||
nbio_interface.AbstractIOReference
|
||||
|
||||
"""
|
||||
if not callable(on_done):
|
||||
raise TypeError(
|
||||
'on_done arg must be callable, but got {!r}'.format(on_done))
|
||||
|
||||
return _AsyncioIOReference(
|
||||
asyncio.ensure_future(coro, loop=self._loop), on_done)
|
||||
|
||||
|
||||
class _TimerHandle(nbio_interface.AbstractTimerReference):
|
||||
"""This module's adaptation of `nbio_interface.AbstractTimerReference`.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, handle):
|
||||
"""
|
||||
|
||||
:param asyncio.Handle handle:
|
||||
"""
|
||||
self._handle = handle
|
||||
|
||||
def cancel(self):
|
||||
if self._handle is not None:
|
||||
self._handle.cancel()
|
||||
self._handle = None
|
||||
|
||||
|
||||
class _AsyncioIOReference(nbio_interface.AbstractIOReference):
|
||||
"""This module's adaptation of `nbio_interface.AbstractIOReference`.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, future, on_done):
|
||||
"""
|
||||
:param asyncio.Future future:
|
||||
:param callable on_done: user callback that takes the completion result
|
||||
or exception as its only arg. It will not be called if the operation
|
||||
was cancelled.
|
||||
|
||||
"""
|
||||
if not callable(on_done):
|
||||
raise TypeError(
|
||||
'on_done arg must be callable, but got {!r}'.format(on_done))
|
||||
|
||||
self._future = future
|
||||
|
||||
def on_done_adapter(future):
|
||||
"""Handle completion callback from the future instance"""
|
||||
|
||||
# NOTE: Asyncio schedules callback for cancelled futures, but pika
|
||||
# doesn't want that
|
||||
if not future.cancelled():
|
||||
on_done(future.exception() or future.result())
|
||||
|
||||
future.add_done_callback(on_done_adapter)
|
||||
|
||||
def cancel(self):
|
||||
"""Cancel pending operation
|
||||
|
||||
:returns: False if was already done or cancelled; True otherwise
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
return self._future.cancel()
|
||||
@@ -0,0 +1,501 @@
|
||||
"""Base class extended by connection adapters. This extends the
|
||||
connection.Connection class to encapsulate connection behavior but still
|
||||
isolate socket and low level communication.
|
||||
|
||||
"""
|
||||
import abc
|
||||
import functools
|
||||
import logging
|
||||
|
||||
import pika.compat
|
||||
import pika.exceptions
|
||||
import pika.tcp_socket_opts
|
||||
|
||||
from pika.adapters.utils import connection_workflow, nbio_interface
|
||||
from pika import connection
|
||||
|
||||
LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class BaseConnection(connection.Connection):
|
||||
"""BaseConnection class that should be extended by connection adapters.
|
||||
|
||||
This class abstracts I/O loop and transport services from pika core.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, parameters, on_open_callback, on_open_error_callback,
|
||||
on_close_callback, nbio, internal_connection_workflow):
|
||||
"""Create a new instance of the Connection object.
|
||||
|
||||
:param None|pika.connection.Parameters parameters: Connection parameters
|
||||
:param None|method on_open_callback: Method to call on connection open
|
||||
:param None | method on_open_error_callback: Called if the connection
|
||||
can't be established or connection establishment is interrupted by
|
||||
`Connection.close()`: on_open_error_callback(Connection, exception).
|
||||
:param None | method on_close_callback: Called when a previously fully
|
||||
open connection is closed:
|
||||
`on_close_callback(Connection, exception)`, where `exception` is
|
||||
either an instance of `exceptions.ConnectionClosed` if closed by
|
||||
user or broker or exception of another type that describes the cause
|
||||
of connection failure.
|
||||
:param pika.adapters.utils.nbio_interface.AbstractIOServices nbio:
|
||||
asynchronous services
|
||||
:param bool internal_connection_workflow: True for autonomous connection
|
||||
establishment which is default; False for externally-managed
|
||||
connection workflow via the `create_connection()` factory.
|
||||
:raises: RuntimeError
|
||||
:raises: ValueError
|
||||
|
||||
"""
|
||||
if parameters and not isinstance(parameters, connection.Parameters):
|
||||
raise ValueError(
|
||||
'Expected instance of Parameters, not %r' % (parameters,))
|
||||
|
||||
self._nbio = nbio
|
||||
|
||||
self._connection_workflow = None # type: connection_workflow.AMQPConnectionWorkflow
|
||||
self._transport = None # type: pika.adapters.utils.nbio_interface.AbstractStreamTransport
|
||||
|
||||
self._got_eof = False # transport indicated EOF (connection reset)
|
||||
|
||||
super(BaseConnection, self).__init__(
|
||||
parameters,
|
||||
on_open_callback,
|
||||
on_open_error_callback,
|
||||
on_close_callback,
|
||||
internal_connection_workflow=internal_connection_workflow)
|
||||
|
||||
def _init_connection_state(self):
|
||||
"""Initialize or reset all of our internal state variables for a given
|
||||
connection. If we disconnect and reconnect, all of our state needs to
|
||||
be wiped.
|
||||
|
||||
"""
|
||||
super(BaseConnection, self)._init_connection_state()
|
||||
|
||||
self._connection_workflow = None
|
||||
self._transport = None
|
||||
self._got_eof = False
|
||||
|
||||
def __repr__(self):
|
||||
|
||||
# def get_socket_repr(sock):
|
||||
# """Return socket info suitable for use in repr"""
|
||||
# if sock is None:
|
||||
# return None
|
||||
#
|
||||
# sockname = None
|
||||
# peername = None
|
||||
# try:
|
||||
# sockname = sock.getsockname()
|
||||
# except pika.compat.SOCKET_ERROR:
|
||||
# # closed?
|
||||
# pass
|
||||
# else:
|
||||
# try:
|
||||
# peername = sock.getpeername()
|
||||
# except pika.compat.SOCKET_ERROR:
|
||||
# # not connected?
|
||||
# pass
|
||||
#
|
||||
# return '%s->%s' % (sockname, peername)
|
||||
# TODO need helpful __repr__ in transports
|
||||
return ('<%s %s transport=%s params=%s>' % (
|
||||
self.__class__.__name__, self._STATE_NAMES[self.connection_state],
|
||||
self._transport, self.params))
|
||||
|
||||
@classmethod
|
||||
@abc.abstractmethod
|
||||
def create_connection(cls,
|
||||
connection_configs,
|
||||
on_done,
|
||||
custom_ioloop=None,
|
||||
workflow=None):
|
||||
"""Asynchronously create a connection to an AMQP broker using the given
|
||||
configurations. Will attempt to connect using each config in the given
|
||||
order, including all compatible resolved IP addresses of the hostname
|
||||
supplied in each config, until one is established or all attempts fail.
|
||||
|
||||
See also `_start_connection_workflow()`.
|
||||
|
||||
:param sequence connection_configs: A sequence of one or more
|
||||
`pika.connection.Parameters`-based objects.
|
||||
:param callable on_done: as defined in
|
||||
`connection_workflow.AbstractAMQPConnectionWorkflow.start()`.
|
||||
:param object | None custom_ioloop: Provide a custom I/O loop that is
|
||||
native to the specific adapter implementation; if None, the adapter
|
||||
will use a default loop instance, which is typically a singleton.
|
||||
:param connection_workflow.AbstractAMQPConnectionWorkflow | None workflow:
|
||||
Pass an instance of an implementation of the
|
||||
`connection_workflow.AbstractAMQPConnectionWorkflow` interface;
|
||||
defaults to a `connection_workflow.AMQPConnectionWorkflow` instance
|
||||
with default values for optional args.
|
||||
:returns: Connection workflow instance in use. The user should limit
|
||||
their interaction with this object only to it's `abort()` method.
|
||||
:rtype: connection_workflow.AbstractAMQPConnectionWorkflow
|
||||
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@classmethod
|
||||
def _start_connection_workflow(cls, connection_configs, connection_factory,
|
||||
nbio, workflow, on_done):
|
||||
"""Helper function for custom implementations of `create_connection()`.
|
||||
|
||||
:param sequence connection_configs: A sequence of one or more
|
||||
`pika.connection.Parameters`-based objects.
|
||||
:param callable connection_factory: A function that takes
|
||||
`pika.connection.Parameters` as its only arg and returns a brand new
|
||||
`pika.connection.Connection`-based adapter instance each time it is
|
||||
called. The factory must instantiate the connection with
|
||||
`internal_connection_workflow=False`.
|
||||
:param pika.adapters.utils.nbio_interface.AbstractIOServices nbio:
|
||||
:param connection_workflow.AbstractAMQPConnectionWorkflow | None workflow:
|
||||
Pass an instance of an implementation of the
|
||||
`connection_workflow.AbstractAMQPConnectionWorkflow` interface;
|
||||
defaults to a `connection_workflow.AMQPConnectionWorkflow` instance
|
||||
with default values for optional args.
|
||||
:param callable on_done: as defined in
|
||||
:py:meth:`connection_workflow.AbstractAMQPConnectionWorkflow.start()`.
|
||||
:returns: Connection workflow instance in use. The user should limit
|
||||
their interaction with this object only to it's `abort()` method.
|
||||
:rtype: connection_workflow.AbstractAMQPConnectionWorkflow
|
||||
|
||||
"""
|
||||
if workflow is None:
|
||||
workflow = connection_workflow.AMQPConnectionWorkflow()
|
||||
LOGGER.debug('Created default connection workflow %r', workflow)
|
||||
|
||||
if isinstance(workflow, connection_workflow.AMQPConnectionWorkflow):
|
||||
workflow.set_io_services(nbio)
|
||||
|
||||
def create_connector():
|
||||
"""`AMQPConnector` factory."""
|
||||
return connection_workflow.AMQPConnector(
|
||||
lambda params: _StreamingProtocolShim(
|
||||
connection_factory(params)),
|
||||
nbio)
|
||||
|
||||
workflow.start(
|
||||
connection_configs=connection_configs,
|
||||
connector_factory=create_connector,
|
||||
native_loop=nbio.get_native_ioloop(),
|
||||
on_done=functools.partial(cls._unshim_connection_workflow_callback,
|
||||
on_done))
|
||||
|
||||
return workflow
|
||||
|
||||
@property
|
||||
def ioloop(self):
|
||||
"""
|
||||
:returns: the native I/O loop instance underlying async services selected
|
||||
by user or the default selected by the specialized connection
|
||||
adapter (e.g., Twisted reactor, `asyncio.SelectorEventLoop`,
|
||||
`select_connection.IOLoop`, etc.)
|
||||
:rtype: object
|
||||
"""
|
||||
return self._nbio.get_native_ioloop()
|
||||
|
||||
def _adapter_call_later(self, delay, callback):
|
||||
"""Implement
|
||||
:py:meth:`pika.connection.Connection._adapter_call_later()`.
|
||||
|
||||
"""
|
||||
return self._nbio.call_later(delay, callback)
|
||||
|
||||
def _adapter_remove_timeout(self, timeout_id):
|
||||
"""Implement
|
||||
:py:meth:`pika.connection.Connection._adapter_remove_timeout()`.
|
||||
|
||||
"""
|
||||
timeout_id.cancel()
|
||||
|
||||
def _adapter_add_callback_threadsafe(self, callback):
|
||||
"""Implement
|
||||
:py:meth:`pika.connection.Connection._adapter_add_callback_threadsafe()`.
|
||||
|
||||
"""
|
||||
if not callable(callback):
|
||||
raise TypeError(
|
||||
'callback must be a callable, but got %r' % (callback,))
|
||||
|
||||
self._nbio.add_callback_threadsafe(callback)
|
||||
|
||||
def _adapter_connect_stream(self):
|
||||
"""Initiate full-stack connection establishment asynchronously for
|
||||
internally-initiated connection bring-up.
|
||||
|
||||
Upon failed completion, we will invoke
|
||||
`Connection._on_stream_terminated()`. NOTE: On success,
|
||||
the stack will be up already, so there is no corresponding callback.
|
||||
|
||||
"""
|
||||
self._connection_workflow = connection_workflow.AMQPConnectionWorkflow(
|
||||
_until_first_amqp_attempt=True)
|
||||
|
||||
self._connection_workflow.set_io_services(self._nbio)
|
||||
|
||||
def create_connector():
|
||||
"""`AMQPConnector` factory"""
|
||||
return connection_workflow.AMQPConnector(
|
||||
lambda _params: _StreamingProtocolShim(self), self._nbio)
|
||||
|
||||
self._connection_workflow.start(
|
||||
[self.params],
|
||||
connector_factory=create_connector,
|
||||
native_loop=self._nbio.get_native_ioloop(),
|
||||
on_done=functools.partial(self._unshim_connection_workflow_callback,
|
||||
self._on_connection_workflow_done))
|
||||
|
||||
@staticmethod
|
||||
def _unshim_connection_workflow_callback(user_on_done, shim_or_exc):
|
||||
"""
|
||||
|
||||
:param callable user_on_done: user's `on_done` callback as defined in
|
||||
:py:meth:`connection_workflow.AbstractAMQPConnectionWorkflow.start()`.
|
||||
:param _StreamingProtocolShim | Exception shim_or_exc:
|
||||
"""
|
||||
result = shim_or_exc
|
||||
if isinstance(result, _StreamingProtocolShim):
|
||||
result = result.conn
|
||||
|
||||
user_on_done(result)
|
||||
|
||||
def _abort_connection_workflow(self):
|
||||
"""Asynchronously abort connection workflow. Upon
|
||||
completion, `Connection._on_stream_terminated()` will be called with None
|
||||
as the error argument.
|
||||
|
||||
Assumption: may be called only while connection is opening.
|
||||
|
||||
"""
|
||||
assert not self._opened, (
|
||||
'_abort_connection_workflow() may be called only when '
|
||||
'connection is opening.')
|
||||
|
||||
if self._transport is None:
|
||||
# NOTE: this is possible only when user calls Connection.close() to
|
||||
# interrupt internally-initiated connection establishment.
|
||||
# self._connection_workflow.abort() would not call
|
||||
# Connection.close() before pairing of connection with transport.
|
||||
assert self._internal_connection_workflow, (
|
||||
'Unexpected _abort_connection_workflow() call with '
|
||||
'no transport in external connection workflow mode.')
|
||||
|
||||
# This will result in call to _on_connection_workflow_done() upon
|
||||
# completion
|
||||
self._connection_workflow.abort()
|
||||
else:
|
||||
# NOTE: we can't use self._connection_workflow.abort() in this case,
|
||||
# because it would result in infinite recursion as we're called
|
||||
# from Connection.close() and _connection_workflow.abort() calls
|
||||
# Connection.close() to abort a connection that's already been
|
||||
# paired with a transport. During internally-initiated connection
|
||||
# establishment, AMQPConnectionWorkflow will discover that user
|
||||
# aborted the connection when it receives
|
||||
# pika.exceptions.ConnectionOpenAborted.
|
||||
|
||||
# This completes asynchronously, culminating in call to our method
|
||||
# `connection_lost()`
|
||||
self._transport.abort()
|
||||
|
||||
def _on_connection_workflow_done(self, conn_or_exc):
|
||||
"""`AMQPConnectionWorkflow` completion callback.
|
||||
|
||||
:param BaseConnection | Exception conn_or_exc: Our own connection
|
||||
instance on success; exception on failure. See
|
||||
`AbstractAMQPConnectionWorkflow.start()` for details.
|
||||
|
||||
"""
|
||||
LOGGER.debug('Full-stack connection workflow completed: %r',
|
||||
conn_or_exc)
|
||||
|
||||
self._connection_workflow = None
|
||||
|
||||
# Notify protocol of failure
|
||||
if isinstance(conn_or_exc, Exception):
|
||||
self._transport = None
|
||||
if isinstance(conn_or_exc,
|
||||
connection_workflow.AMQPConnectionWorkflowAborted):
|
||||
LOGGER.info('Full-stack connection workflow aborted: %r',
|
||||
conn_or_exc)
|
||||
# So that _handle_connection_workflow_failure() will know it's
|
||||
# not a failure
|
||||
conn_or_exc = None
|
||||
else:
|
||||
LOGGER.error('Full-stack connection workflow failed: %r',
|
||||
conn_or_exc)
|
||||
if (isinstance(conn_or_exc,
|
||||
connection_workflow.AMQPConnectionWorkflowFailed)
|
||||
and isinstance(
|
||||
conn_or_exc.exceptions[-1], connection_workflow.
|
||||
AMQPConnectorSocketConnectError)):
|
||||
conn_or_exc = pika.exceptions.AMQPConnectionError(
|
||||
conn_or_exc)
|
||||
|
||||
self._handle_connection_workflow_failure(conn_or_exc)
|
||||
else:
|
||||
# NOTE: On success, the stack will be up already, so there is no
|
||||
# corresponding callback.
|
||||
assert conn_or_exc is self, \
|
||||
'Expected self conn={!r} from workflow, but got {!r}.'.format(
|
||||
self, conn_or_exc)
|
||||
|
||||
def _handle_connection_workflow_failure(self, error):
|
||||
"""Handle failure of self-initiated stack bring-up and call
|
||||
`Connection._on_stream_terminated()` if connection is not in closed state
|
||||
yet. Called by adapter layer when the full-stack connection workflow
|
||||
fails.
|
||||
|
||||
:param Exception | None error: exception instance describing the reason
|
||||
for failure or None if the connection workflow was aborted.
|
||||
"""
|
||||
if error is None:
|
||||
LOGGER.info('Self-initiated stack bring-up aborted.')
|
||||
else:
|
||||
LOGGER.error('Self-initiated stack bring-up failed: %r', error)
|
||||
|
||||
if not self.is_closed:
|
||||
self._on_stream_terminated(error)
|
||||
else:
|
||||
# This may happen when AMQP layer bring up was started but did not
|
||||
# complete
|
||||
LOGGER.debug('_handle_connection_workflow_failure(): '
|
||||
'suppressing - connection already closed.')
|
||||
|
||||
def _adapter_disconnect_stream(self):
|
||||
"""Asynchronously bring down the streaming transport layer and invoke
|
||||
`Connection._on_stream_terminated()` asynchronously when complete.
|
||||
|
||||
"""
|
||||
if not self._opened:
|
||||
self._abort_connection_workflow()
|
||||
else:
|
||||
# This completes asynchronously, culminating in call to our method
|
||||
# `connection_lost()`
|
||||
self._transport.abort()
|
||||
|
||||
def _adapter_emit_data(self, data):
|
||||
"""Take ownership of data and send it to AMQP server as soon as
|
||||
possible.
|
||||
|
||||
:param bytes data:
|
||||
|
||||
"""
|
||||
self._transport.write(data)
|
||||
|
||||
def _proto_connection_made(self, transport):
|
||||
"""Introduces transport to protocol after transport is connected.
|
||||
|
||||
:py:class:`.utils.nbio_interface.AbstractStreamProtocol` implementation.
|
||||
|
||||
:param nbio_interface.AbstractStreamTransport transport:
|
||||
:raises Exception: Exception-based exception on error
|
||||
|
||||
"""
|
||||
self._transport = transport
|
||||
|
||||
# Let connection know that stream is available
|
||||
self._on_stream_connected()
|
||||
|
||||
def _proto_connection_lost(self, error):
|
||||
"""Called upon loss or closing of TCP connection.
|
||||
|
||||
:py:class:`.utils.nbio_interface.AbstractStreamProtocol` implementation.
|
||||
|
||||
NOTE: `connection_made()` and `connection_lost()` are each called just
|
||||
once and in that order. All other callbacks are called between them.
|
||||
|
||||
:param BaseException | None error: An exception (check for
|
||||
`BaseException`) indicates connection failure. None indicates that
|
||||
connection was closed on this side, such as when it's aborted or
|
||||
when `AbstractStreamProtocol.eof_received()` returns a falsy result.
|
||||
:raises Exception: Exception-based exception on error
|
||||
|
||||
"""
|
||||
self._transport = None
|
||||
|
||||
if error is None:
|
||||
# Either result of `eof_received()` or abort
|
||||
if self._got_eof:
|
||||
error = pika.exceptions.StreamLostError(
|
||||
'Transport indicated EOF')
|
||||
else:
|
||||
error = pika.exceptions.StreamLostError(
|
||||
'Stream connection lost: {!r}'.format(error))
|
||||
|
||||
LOGGER.log(logging.DEBUG if error is None else logging.ERROR,
|
||||
'connection_lost: %r', error)
|
||||
|
||||
self._on_stream_terminated(error)
|
||||
|
||||
def _proto_eof_received(self): # pylint: disable=R0201
|
||||
"""Called after the remote peer shuts its write end of the connection.
|
||||
:py:class:`.utils.nbio_interface.AbstractStreamProtocol` implementation.
|
||||
|
||||
:returns: A falsy value (including None) will cause the transport to
|
||||
close itself, resulting in an eventual `connection_lost()` call
|
||||
from the transport. If a truthy value is returned, it will be the
|
||||
protocol's responsibility to close/abort the transport.
|
||||
:rtype: falsy|truthy
|
||||
:raises Exception: Exception-based exception on error
|
||||
|
||||
"""
|
||||
LOGGER.error('Transport indicated EOF.')
|
||||
|
||||
self._got_eof = True
|
||||
|
||||
# This is how a reset connection will typically present itself
|
||||
# when we have nothing to send to the server over plaintext stream.
|
||||
#
|
||||
# Have transport tear down the connection and invoke our
|
||||
# `connection_lost` method
|
||||
return False
|
||||
|
||||
def _proto_data_received(self, data):
|
||||
"""Called to deliver incoming data from the server to the protocol.
|
||||
|
||||
:py:class:`.utils.nbio_interface.AbstractStreamProtocol` implementation.
|
||||
|
||||
:param data: Non-empty data bytes.
|
||||
:raises Exception: Exception-based exception on error
|
||||
|
||||
"""
|
||||
self._on_data_available(data)
|
||||
|
||||
|
||||
class _StreamingProtocolShim(nbio_interface.AbstractStreamProtocol):
|
||||
"""Shim for callbacks from transport so that we BaseConnection can
|
||||
delegate to private methods, thus avoiding contamination of API with
|
||||
methods that look public, but aren't.
|
||||
|
||||
"""
|
||||
|
||||
# Override AbstractStreamProtocol abstract methods to enable instantiation
|
||||
connection_made = None
|
||||
connection_lost = None
|
||||
eof_received = None
|
||||
data_received = None
|
||||
|
||||
def __init__(self, conn):
|
||||
"""
|
||||
:param BaseConnection conn:
|
||||
"""
|
||||
self.conn = conn
|
||||
# pylint: disable=W0212
|
||||
self.connection_made = conn._proto_connection_made
|
||||
self.connection_lost = conn._proto_connection_lost
|
||||
self.eof_received = conn._proto_eof_received
|
||||
self.data_received = conn._proto_data_received
|
||||
|
||||
def __getattr__(self, attr):
|
||||
"""Proxy inexistent attribute requests to our connection instance
|
||||
so that AMQPConnectionWorkflow/AMQPConnector may treat the shim as an
|
||||
actual connection.
|
||||
|
||||
"""
|
||||
return getattr(self.conn, attr)
|
||||
|
||||
def __repr__(self):
|
||||
return '{}: {!r}'.format(self.__class__.__name__, self.conn)
|
||||
@@ -0,0 +1,469 @@
|
||||
"""Use pika with the Gevent IOLoop."""
|
||||
|
||||
import functools
|
||||
import logging
|
||||
import os
|
||||
import threading
|
||||
import weakref
|
||||
|
||||
try:
|
||||
import queue
|
||||
except ImportError: # Python <= v2.7
|
||||
import Queue as queue
|
||||
|
||||
import gevent
|
||||
import gevent.hub
|
||||
import gevent.socket
|
||||
|
||||
import pika.compat
|
||||
from pika.adapters.base_connection import BaseConnection
|
||||
from pika.adapters.utils.io_services_utils import check_callback_arg
|
||||
from pika.adapters.utils.nbio_interface import (
|
||||
AbstractIOReference,
|
||||
AbstractIOServices,
|
||||
)
|
||||
from pika.adapters.utils.selector_ioloop_adapter import (
|
||||
AbstractSelectorIOLoop,
|
||||
SelectorIOServicesAdapter,
|
||||
)
|
||||
|
||||
LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class GeventConnection(BaseConnection):
|
||||
"""Implementation of pika's ``BaseConnection``.
|
||||
|
||||
An async selector-based connection which integrates with Gevent.
|
||||
"""
|
||||
|
||||
def __init__(self,
|
||||
parameters=None,
|
||||
on_open_callback=None,
|
||||
on_open_error_callback=None,
|
||||
on_close_callback=None,
|
||||
custom_ioloop=None,
|
||||
internal_connection_workflow=True):
|
||||
"""Create a new GeventConnection instance and connect to RabbitMQ on
|
||||
Gevent's event-loop.
|
||||
|
||||
:param pika.connection.Parameters|None parameters: The connection
|
||||
parameters
|
||||
:param callable|None on_open_callback: The method to call when the
|
||||
connection is open
|
||||
:param callable|None on_open_error_callback: Called if the connection
|
||||
can't be established or connection establishment is interrupted by
|
||||
`Connection.close()`:
|
||||
on_open_error_callback(Connection, exception)
|
||||
:param callable|None on_close_callback: Called when a previously fully
|
||||
open connection is closed:
|
||||
`on_close_callback(Connection, exception)`, where `exception` is
|
||||
either an instance of `exceptions.ConnectionClosed` if closed by
|
||||
user or broker or exception of another type that describes the
|
||||
cause of connection failure
|
||||
:param gevent._interfaces.ILoop|nbio_interface.AbstractIOServices|None
|
||||
custom_ioloop: Use a custom Gevent ILoop.
|
||||
:param bool internal_connection_workflow: True for autonomous connection
|
||||
establishment which is default; False for externally-managed
|
||||
connection workflow via the `create_connection()` factory
|
||||
"""
|
||||
if pika.compat.ON_WINDOWS:
|
||||
raise RuntimeError('GeventConnection is not supported on Windows.')
|
||||
|
||||
custom_ioloop = (custom_ioloop or
|
||||
_GeventSelectorIOLoop(gevent.get_hub()))
|
||||
|
||||
if isinstance(custom_ioloop, AbstractIOServices):
|
||||
nbio = custom_ioloop
|
||||
else:
|
||||
nbio = _GeventSelectorIOServicesAdapter(custom_ioloop)
|
||||
|
||||
super(GeventConnection, self).__init__(
|
||||
parameters,
|
||||
on_open_callback,
|
||||
on_open_error_callback,
|
||||
on_close_callback,
|
||||
nbio,
|
||||
internal_connection_workflow=internal_connection_workflow)
|
||||
|
||||
@classmethod
|
||||
def create_connection(cls,
|
||||
connection_configs,
|
||||
on_done,
|
||||
custom_ioloop=None,
|
||||
workflow=None):
|
||||
"""Implement
|
||||
:py:classmethod::`pika.adapters.BaseConnection.create_connection()`.
|
||||
"""
|
||||
custom_ioloop = (custom_ioloop or
|
||||
_GeventSelectorIOLoop(gevent.get_hub()))
|
||||
|
||||
nbio = _GeventSelectorIOServicesAdapter(custom_ioloop)
|
||||
|
||||
def connection_factory(params):
|
||||
"""Connection factory."""
|
||||
if params is None:
|
||||
raise ValueError('Expected pika.connection.Parameters '
|
||||
'instance, but got None in params arg.')
|
||||
return cls(parameters=params,
|
||||
custom_ioloop=nbio,
|
||||
internal_connection_workflow=False)
|
||||
|
||||
return cls._start_connection_workflow(
|
||||
connection_configs=connection_configs,
|
||||
connection_factory=connection_factory,
|
||||
nbio=nbio,
|
||||
workflow=workflow,
|
||||
on_done=on_done)
|
||||
|
||||
|
||||
class _TSafeCallbackQueue(object):
|
||||
"""Dispatch callbacks from any thread to be executed in the main thread
|
||||
efficiently with IO events.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""
|
||||
:param _GeventSelectorIOLoop loop: IO loop to add callbacks to.
|
||||
"""
|
||||
# Thread-safe, blocking queue.
|
||||
self._queue = queue.Queue()
|
||||
# PIPE to trigger an event when the queue is ready.
|
||||
self._read_fd, self._write_fd = os.pipe()
|
||||
# Lock around writes to the PIPE in case some platform/implementation
|
||||
# requires this.
|
||||
self._write_lock = threading.RLock()
|
||||
|
||||
@property
|
||||
def fd(self):
|
||||
"""The file-descriptor to register for READ events in the IO loop."""
|
||||
return self._read_fd
|
||||
|
||||
def add_callback_threadsafe(self, callback):
|
||||
"""Add an item to the queue from any thread. The configured handler
|
||||
will be invoked with the item in the main thread.
|
||||
|
||||
:param item: Object to add to the queue.
|
||||
"""
|
||||
self._queue.put(callback)
|
||||
with self._write_lock:
|
||||
# The value written is not important.
|
||||
os.write(self._write_fd, b'\xFF')
|
||||
|
||||
def run_next_callback(self):
|
||||
"""Invoke the next callback from the queue.
|
||||
|
||||
MUST run in the main thread. If no callback was added to the queue,
|
||||
this will block the IO loop.
|
||||
|
||||
Performs a blocking READ on the pipe so must only be called when the
|
||||
pipe is ready for reading.
|
||||
"""
|
||||
try:
|
||||
callback = self._queue.get_nowait()
|
||||
except queue.Empty:
|
||||
# Should never happen.
|
||||
LOGGER.warning("Callback queue was empty.")
|
||||
else:
|
||||
# Read the byte from the pipe so the event doesn't re-fire.
|
||||
os.read(self._read_fd, 1)
|
||||
callback()
|
||||
|
||||
|
||||
class _GeventSelectorIOLoop(AbstractSelectorIOLoop):
|
||||
"""Implementation of `AbstractSelectorIOLoop` using the Gevent event loop.
|
||||
|
||||
Required by implementations of `SelectorIOServicesAdapter`.
|
||||
"""
|
||||
# Gevent's READ and WRITE masks are defined as 1 and 2 respectively. No
|
||||
# ERROR mask is defined.
|
||||
# See http://www.gevent.org/api/gevent.hub.html#gevent._interfaces.ILoop.io
|
||||
READ = 1
|
||||
WRITE = 2
|
||||
ERROR = 0
|
||||
|
||||
def __init__(self, gevent_hub=None):
|
||||
"""
|
||||
:param gevent._interfaces.ILoop gevent_loop:
|
||||
"""
|
||||
self._hub = gevent_hub or gevent.get_hub()
|
||||
self._io_watchers_by_fd = {}
|
||||
# Used to start/stop the loop.
|
||||
self._waiter = gevent.hub.Waiter()
|
||||
|
||||
# For adding callbacks from other threads. See `add_callback(..)`.
|
||||
self._callback_queue = _TSafeCallbackQueue()
|
||||
|
||||
def run_callback_in_main_thread(fd, events):
|
||||
"""Swallow the fd and events arguments."""
|
||||
del fd
|
||||
del events
|
||||
self._callback_queue.run_next_callback()
|
||||
|
||||
self.add_handler(self._callback_queue.fd, run_callback_in_main_thread,
|
||||
self.READ)
|
||||
|
||||
def close(self):
|
||||
"""Release the loop's resources."""
|
||||
self._hub.loop.destroy()
|
||||
self._hub = None
|
||||
|
||||
def start(self):
|
||||
"""Run the I/O loop. It will loop until requested to exit. See `stop()`.
|
||||
"""
|
||||
LOGGER.debug("Passing control to Gevent's IOLoop")
|
||||
self._waiter.get() # Block until 'stop()' is called.
|
||||
|
||||
LOGGER.debug("Control was passed back from Gevent's IOLoop")
|
||||
self._waiter.clear()
|
||||
|
||||
def stop(self):
|
||||
"""Request exit from the ioloop. The loop is NOT guaranteed to
|
||||
stop before this method returns.
|
||||
|
||||
To invoke `stop()` safely from a thread other than this IOLoop's thread,
|
||||
call it via `add_callback_threadsafe`; e.g.,
|
||||
|
||||
`ioloop.add_callback(ioloop.stop)`
|
||||
"""
|
||||
self._waiter.switch(None)
|
||||
|
||||
def add_callback(self, callback):
|
||||
"""Requests a call to the given function as soon as possible in the
|
||||
context of this IOLoop's thread.
|
||||
|
||||
NOTE: This is the only thread-safe method in IOLoop. All other
|
||||
manipulations of IOLoop must be performed from the IOLoop's thread.
|
||||
|
||||
For example, a thread may request a call to the `stop` method of an
|
||||
ioloop that is running in a different thread via
|
||||
`ioloop.add_callback_threadsafe(ioloop.stop)`
|
||||
|
||||
:param callable callback: The callback method
|
||||
"""
|
||||
if gevent.get_hub() == self._hub:
|
||||
# We're in the main thread; just add the callback.
|
||||
LOGGER.debug("Adding callback from main thread")
|
||||
self._hub.loop.run_callback(callback)
|
||||
else:
|
||||
# This isn't the main thread and Gevent's hub/loop don't provide
|
||||
# any thread-safety so enqueue the callback for it to be registered
|
||||
# in the main thread.
|
||||
LOGGER.debug("Adding callback from another thread")
|
||||
callback = functools.partial(self._hub.loop.run_callback, callback)
|
||||
self._callback_queue.add_callback_threadsafe(callback)
|
||||
|
||||
def call_later(self, delay, callback):
|
||||
"""Add the callback to the IOLoop timer to be called after delay seconds
|
||||
from the time of call on best-effort basis. Returns a handle to the
|
||||
timeout.
|
||||
|
||||
:param float delay: The number of seconds to wait to call callback
|
||||
:param callable callback: The callback method
|
||||
:returns: handle to the created timeout that may be passed to
|
||||
`remove_timeout()`
|
||||
:rtype: object
|
||||
"""
|
||||
timer = self._hub.loop.timer(delay)
|
||||
timer.start(callback)
|
||||
return timer
|
||||
|
||||
def remove_timeout(self, timeout_handle):
|
||||
"""Remove a timeout
|
||||
|
||||
:param timeout_handle: Handle of timeout to remove
|
||||
"""
|
||||
timeout_handle.close()
|
||||
|
||||
def add_handler(self, fd, handler, events):
|
||||
"""Start watching the given file descriptor for events
|
||||
|
||||
:param int fd: The file descriptor
|
||||
:param callable handler: When requested event(s) occur,
|
||||
`handler(fd, events)` will be called.
|
||||
:param int events: The event mask (READ|WRITE)
|
||||
"""
|
||||
io_watcher = self._hub.loop.io(fd, events)
|
||||
self._io_watchers_by_fd[fd] = io_watcher
|
||||
io_watcher.start(handler, fd, events)
|
||||
|
||||
def update_handler(self, fd, events):
|
||||
"""Change the events being watched for.
|
||||
|
||||
:param int fd: The file descriptor
|
||||
:param int events: The new event mask (READ|WRITE)
|
||||
"""
|
||||
io_watcher = self._io_watchers_by_fd[fd]
|
||||
# Save callback from the original watcher. The close the old watcher
|
||||
# and create a new one using the saved callback and the new events.
|
||||
callback = io_watcher.callback
|
||||
io_watcher.close()
|
||||
del self._io_watchers_by_fd[fd]
|
||||
self.add_handler(fd, callback, events)
|
||||
|
||||
def remove_handler(self, fd):
|
||||
"""Stop watching the given file descriptor for events
|
||||
|
||||
:param int fd: The file descriptor
|
||||
"""
|
||||
io_watcher = self._io_watchers_by_fd[fd]
|
||||
io_watcher.close()
|
||||
del self._io_watchers_by_fd[fd]
|
||||
|
||||
|
||||
class _GeventSelectorIOServicesAdapter(SelectorIOServicesAdapter):
|
||||
"""SelectorIOServicesAdapter implementation using Gevent's DNS resolver."""
|
||||
|
||||
def getaddrinfo(self,
|
||||
host,
|
||||
port,
|
||||
on_done,
|
||||
family=0,
|
||||
socktype=0,
|
||||
proto=0,
|
||||
flags=0):
|
||||
"""Implement :py:meth:`.nbio_interface.AbstractIOServices.getaddrinfo()`.
|
||||
"""
|
||||
resolver = _GeventAddressResolver(native_loop=self._loop,
|
||||
host=host,
|
||||
port=port,
|
||||
family=family,
|
||||
socktype=socktype,
|
||||
proto=proto,
|
||||
flags=flags,
|
||||
on_done=on_done)
|
||||
resolver.start()
|
||||
# Return needs an implementation of `AbstractIOReference`.
|
||||
return _GeventIOLoopIOHandle(resolver)
|
||||
|
||||
|
||||
class _GeventIOLoopIOHandle(AbstractIOReference):
|
||||
"""Implement `AbstractIOReference`.
|
||||
|
||||
Only used to wrap the _GeventAddressResolver.
|
||||
"""
|
||||
|
||||
def __init__(self, subject):
|
||||
"""
|
||||
:param subject: subject of the reference containing a `cancel()` method
|
||||
"""
|
||||
self._cancel = subject.cancel
|
||||
|
||||
def cancel(self):
|
||||
"""Cancel pending operation
|
||||
|
||||
:returns: False if was already done or cancelled; True otherwise
|
||||
:rtype: bool
|
||||
"""
|
||||
return self._cancel()
|
||||
|
||||
|
||||
class _GeventAddressResolver(object):
|
||||
"""Performs getaddrinfo asynchronously Gevent's configured resolver in a
|
||||
separate greenlet and invoking the provided callback with the result.
|
||||
|
||||
See: http://www.gevent.org/dns.html
|
||||
"""
|
||||
__slots__ = (
|
||||
'_loop',
|
||||
'_on_done',
|
||||
'_greenlet',
|
||||
# getaddrinfo(..) args:
|
||||
'_ga_host',
|
||||
'_ga_port',
|
||||
'_ga_family',
|
||||
'_ga_socktype',
|
||||
'_ga_proto',
|
||||
'_ga_flags')
|
||||
|
||||
def __init__(self, native_loop, host, port, family, socktype, proto, flags,
|
||||
on_done):
|
||||
"""Initialize the `_GeventAddressResolver`.
|
||||
|
||||
:param AbstractSelectorIOLoop native_loop:
|
||||
:param host: `see socket.getaddrinfo()`
|
||||
:param port: `see socket.getaddrinfo()`
|
||||
:param family: `see socket.getaddrinfo()`
|
||||
:param socktype: `see socket.getaddrinfo()`
|
||||
:param proto: `see socket.getaddrinfo()`
|
||||
:param flags: `see socket.getaddrinfo()`
|
||||
:param on_done: on_done(records|BaseException) callback for reporting
|
||||
result from the given I/O loop. The single arg will be either an
|
||||
exception object (check for `BaseException`) in case of failure or
|
||||
the result returned by `socket.getaddrinfo()`.
|
||||
"""
|
||||
check_callback_arg(on_done, 'on_done')
|
||||
|
||||
self._loop = native_loop
|
||||
self._on_done = on_done
|
||||
# Reference to the greenlet performing `getaddrinfo`.
|
||||
self._greenlet = None
|
||||
# getaddrinfo(..) args.
|
||||
self._ga_host = host
|
||||
self._ga_port = port
|
||||
self._ga_family = family
|
||||
self._ga_socktype = socktype
|
||||
self._ga_proto = proto
|
||||
self._ga_flags = flags
|
||||
|
||||
def start(self):
|
||||
"""Start an asynchronous getaddrinfo invocation."""
|
||||
if self._greenlet is None:
|
||||
self._greenlet = gevent.spawn_raw(self._resolve)
|
||||
else:
|
||||
LOGGER.warning("_GeventAddressResolver already started")
|
||||
|
||||
def cancel(self):
|
||||
"""Cancel the pending resolver."""
|
||||
changed = False
|
||||
|
||||
if self._greenlet is not None:
|
||||
changed = True
|
||||
self._stop_greenlet()
|
||||
|
||||
self._cleanup()
|
||||
return changed
|
||||
|
||||
def _cleanup(self):
|
||||
"""Stop the resolver and release any resources."""
|
||||
self._stop_greenlet()
|
||||
self._loop = None
|
||||
self._on_done = None
|
||||
|
||||
def _stop_greenlet(self):
|
||||
"""Stop the greenlet performing getaddrinfo if running.
|
||||
|
||||
Otherwise, this is a no-op.
|
||||
"""
|
||||
if self._greenlet is not None:
|
||||
gevent.kill(self._greenlet)
|
||||
self._greenlet = None
|
||||
|
||||
def _resolve(self):
|
||||
"""Call `getaddrinfo()` and return result via user's callback
|
||||
function on the configured IO loop.
|
||||
"""
|
||||
try:
|
||||
# NOTE(JG): Can't use kwargs with getaddrinfo on Python <= v2.7.
|
||||
result = gevent.socket.getaddrinfo(self._ga_host, self._ga_port,
|
||||
self._ga_family,
|
||||
self._ga_socktype,
|
||||
self._ga_proto, self._ga_flags)
|
||||
except Exception as exc: # pylint: disable=broad-except
|
||||
LOGGER.error('Address resolution failed: %r', exc)
|
||||
result = exc
|
||||
|
||||
callback = functools.partial(self._dispatch_callback, result)
|
||||
self._loop.add_callback(callback)
|
||||
|
||||
def _dispatch_callback(self, result):
|
||||
"""Invoke the configured completion callback and any subsequent cleanup.
|
||||
|
||||
:param result: result from getaddrinfo, or the exception if raised.
|
||||
"""
|
||||
try:
|
||||
LOGGER.debug(
|
||||
'Invoking async getaddrinfo() completion callback; host=%r',
|
||||
self._ga_host)
|
||||
self._on_done(result)
|
||||
finally:
|
||||
self._cleanup()
|
||||
@@ -0,0 +1,90 @@
|
||||
"""Use pika with the Tornado IOLoop
|
||||
|
||||
"""
|
||||
import logging
|
||||
|
||||
from tornado import ioloop
|
||||
|
||||
from pika.adapters.utils import nbio_interface, selector_ioloop_adapter
|
||||
from pika.adapters import base_connection
|
||||
|
||||
LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TornadoConnection(base_connection.BaseConnection):
|
||||
"""The TornadoConnection runs on the Tornado IOLoop.
|
||||
"""
|
||||
|
||||
def __init__(self,
|
||||
parameters=None,
|
||||
on_open_callback=None,
|
||||
on_open_error_callback=None,
|
||||
on_close_callback=None,
|
||||
custom_ioloop=None,
|
||||
internal_connection_workflow=True):
|
||||
"""Create a new instance of the TornadoConnection class, connecting
|
||||
to RabbitMQ automatically.
|
||||
|
||||
:param pika.connection.Parameters|None parameters: The connection
|
||||
parameters
|
||||
:param callable|None on_open_callback: The method to call when the
|
||||
connection is open
|
||||
:param callable|None on_open_error_callback: Called if the connection
|
||||
can't be established or connection establishment is interrupted by
|
||||
`Connection.close()`:
|
||||
on_open_error_callback(Connection, exception)
|
||||
:param callable|None on_close_callback: Called when a previously fully
|
||||
open connection is closed:
|
||||
`on_close_callback(Connection, exception)`, where `exception` is
|
||||
either an instance of `exceptions.ConnectionClosed` if closed by
|
||||
user or broker or exception of another type that describes the
|
||||
cause of connection failure
|
||||
:param ioloop.IOLoop|nbio_interface.AbstractIOServices|None custom_ioloop:
|
||||
Override using the global IOLoop in Tornado
|
||||
:param bool internal_connection_workflow: True for autonomous connection
|
||||
establishment which is default; False for externally-managed
|
||||
connection workflow via the `create_connection()` factory
|
||||
|
||||
"""
|
||||
if isinstance(custom_ioloop, nbio_interface.AbstractIOServices):
|
||||
nbio = custom_ioloop
|
||||
else:
|
||||
nbio = (selector_ioloop_adapter.SelectorIOServicesAdapter(
|
||||
custom_ioloop or ioloop.IOLoop.instance()))
|
||||
super(TornadoConnection, self).__init__(
|
||||
parameters,
|
||||
on_open_callback,
|
||||
on_open_error_callback,
|
||||
on_close_callback,
|
||||
nbio,
|
||||
internal_connection_workflow=internal_connection_workflow)
|
||||
|
||||
@classmethod
|
||||
def create_connection(cls,
|
||||
connection_configs,
|
||||
on_done,
|
||||
custom_ioloop=None,
|
||||
workflow=None):
|
||||
"""Implement
|
||||
:py:classmethod::`pika.adapters.BaseConnection.create_connection()`.
|
||||
|
||||
"""
|
||||
nbio = selector_ioloop_adapter.SelectorIOServicesAdapter(
|
||||
custom_ioloop or ioloop.IOLoop.instance())
|
||||
|
||||
def connection_factory(params):
|
||||
"""Connection factory."""
|
||||
if params is None:
|
||||
raise ValueError('Expected pika.connection.Parameters '
|
||||
'instance, but got None in params arg.')
|
||||
return cls(
|
||||
parameters=params,
|
||||
custom_ioloop=nbio,
|
||||
internal_connection_workflow=False)
|
||||
|
||||
return cls._start_connection_workflow(
|
||||
connection_configs=connection_configs,
|
||||
connection_factory=connection_factory,
|
||||
nbio=nbio,
|
||||
workflow=workflow,
|
||||
on_done=on_done)
|
||||