Python: Ergebnis von asynchronem HTTP POST Request verarbeiten

Sempervivum

Erfahrenes Mitglied
Mir scheint, jetzt verstehe ich erst so einiger Maßen, was Du vor hast: Du schickst 3 Anfragen und zu jeder davon gehört genau eine User-ID. Und wenn die Antwort vom Server kommt, musst Du dieser wieder die richtige User-ID zuordnen. Was die Sache kompliziert macht ist, dass das Ganze asynchron abläuft und nicht sicher gestellt ist, dass die Antworten in der selben Reihenfolge eintreffen wie die Anfragen abgeschickt wurden. Richtig?
 

canju

Erfahrenes Mitglied
Ja, genau. Ziemlich ambitioniert für ein Anfänger-Projekt. Ich hatte ursprünglich nach einer asynchronen PHP-Lösung gesucht, dabei bin ich auf diesen Pythonansatz gestoßen. Da ich mich schon länger mit Python beschäftigen wollte, hab ichs einfach probiert.

Hatte schon versucht in diesem Codeblock hier:
Python:
async def post_async(url, session, headers, body):
    async with session.post(url, headers=headers, json=body) as response:
        text = await response.text()
        return json.loads(text)
irgendwie die user_id mit zu übergeben, da ja hier auch genau der individuelle Body (einer pro user_id) übermittelt wird. Aber langsam schwindet die Konzentration für heute :)
 

Sempervivum

Erfahrenes Mitglied
Was Python betrifft bin ich selber auch noch ziemlich am Anfang. Ich setze es hauptsächlich für das Dateihandling auf meinem PC ein weil ich mich mit Powershell nicht anfreunden konnte. Und davon, diesen Code für das asynchrone Laden zu verstehen, bin ich noch ein wenig entfernt.
 

Sempervivum

Erfahrenes Mitglied
Hallo canju,
hast Du inzwischen eine Lösung gefunden?
Ich dachte mir, es könne nicht schaden und interessant sein, in das Thema mit den asynchronen Anfragen bei Python einzusteigen aber ich bin da auf einen wahren Irrgarten von Techniken und Bibliotheken gestoßen.
 

canju

Erfahrenes Mitglied
Hey @Sempervivum,

sorry für die späte Rückmeldung. Leider bin ich bei diesem Projekt noch nicht wirklich weitergekommen.
Ich hatte auch zwischenzeitlich noch knapp 2 wochen Urlaub, aber trotzdem noch heiß drauf das Thema inkl. Übergabe eigener Parameter zum "Laufen" zu bringen.

...aber ich bin da auf einen wahren Irrgarten von Techniken und Bibliotheken gestoßen
Ja, es gibt mehrere Pyhton-Libraries mit denen man asyncs senden kann. Ich hatte mich für für
aiohttp und asyncio entschieden, da es im vergleich zu requests nochmal schneller war. Finde den Post leider gerade nicht auf die Schnelle, der die Geschwindigkeit der einzelnen Libraries gegenübergestellt hat. Sobald ich ihn finde reiche ich ihn dir nach.
 

Technipion

Erfahrenes Mitglied
Ich hatte auch zwischenzeitlich noch knapp 2 wochen Urlaub, aber trotzdem noch heiß drauf das Thema inkl. Übergabe eigener Parameter zum "Laufen" zu bringen.
Überhaupt kein Problem!

Kannst du vielleicht nochmal kurz deinen aktuellen Stand (am besten mit Code) posten, und beschreiben an welchem Punkt du im Moment hängst? Dann können wir den Faden von dort aus weiterspinnen :LOL:

Gruß Technipion
 

Sempervivum

Erfahrenes Mitglied
Das Interesse am Thema hatte sich bei mir ein wenig verselbständigt und ich bin der Sache weiter nach gegangen. Bin auf dieses Thema gestoßen:
How could I use requests in asyncio?
und der Ansatz von ospider schien mir am modernsten zu sein.
Ich habe die Sache mit der ID, die der Antwort zugeordnet werden muss, zunächst so gelöst, dass ich einer Funktion fetch, die die Anfrage erledigt, die ID als Parameter hinzu gefügt habe und, wenn die Antwort da war, auch in diese eingetragen:
Code:
    import time
    import aiohttp
    import asyncio

    params = [1, 2, 3, 4, 5, 6, 7, 8, 9]
    ids = [11, 12, 13, 14, 15, 16, 17, 18, 19]
    url = r'http://localhost//_python/async-requests/the-api.php'

    # Fetch data for one parameter asynchronously:
    async def fetch(session, url, params, id):
        # Send request with method POST:
        start = time.perf_counter()
        async with session.post(url, data=params) as response:
            # Decode JSON in response:
            resp = await response.json()
            print(str(time.perf_counter() - start) + ' ' + resp['param1'])
            # Add ID to response:
            resp['id'] = id
            # return response:
            return resp

    async def main():
        async with aiohttp.ClientSession() as session:
            idx = 0
            tasks = []
            # Loop through params:
            for param in params:
                # Append task for fetching the current param to list,
                # ID as an additional parameter:
                tasks.append(fetch(session, url, {'param1': param}, ids[idx]))
                idx += 1
                # 3 tasks prepared?
                if(idx % 3 == 0):
                    # Process tasks asynchronously:
                    resps = await asyncio.gather(*tasks)
                    # Process responses:
                    for resp in resps:
                        print(resp)
                    # reset task list:
                    tasks = []

    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main())
Um die API zu simulieren, habe ich dieses einfache PHP-Skript verwendet:
Code:
<?php
header('Content-Type: application/json; charset=utf-8');
$start=microtime(true);
if ($_POST['param1']==1) {
    sleep(3);
} else {
    sleep(1);
}
$resp=["param1"=>$_POST['param1'], "time"=>microtime(true)-$start];
echo json_encode($resp);
Bei dieser Lösung ist mir dann aufgefallen, dass die Antworten eines 3-er Blocks präzise in der selben Reihenfolge verarbeitet werden, wie sie gesendet wurden, obwohl asynchron. Um das zu erhärten, habe ich bei der ersten Anfrage eine größere Verzögerung eingebaut, siehe PHP oben und auch dann blieb die korrekt Reihenfolge erhalten. Ich habe dann den Code geändert, so dass die ID nicht durch die Funktion fetch durchgeschleift wird und das hat funktioniert:
Code:
    import time
    import aiohttp
    import asyncio

    params = [1, 2, 3, 4, 5, 6, 7, 8, 9]
    ids = [11, 12, 13, 14, 15, 16, 17, 18, 19]
    url = r'http://localhost//_python/async-requests/the-api.php'

    # Fetch data for one parameter asynchronously:
    async def fetch(session, url, params):
        # Send request with method POST:
        start = time.perf_counter()
        async with session.post(url, data=params) as response:
            # Decode JSON in response:
            resp = await response.json()
            print(str(time.perf_counter() - start) + ' ' + resp['param1'])
            # return response:
            return resp

    async def main():
        async with aiohttp.ClientSession() as session:
            idx = 0
            idxStart = 0
            tasks = []
            # Loop through params:
            for param in params:
                # Append task for fetching the current param to list,
                tasks.append(fetch(session, url, {'param1': param}))
                idx += 1
                # 3 tasks prepared?
                if(idx % 3 == 0):
                    # Process tasks asynchronously:
                    resps = await asyncio.gather(*tasks)
                    # print all responses including corresponding ID:
                    for resp in resps:
                        print(str(resp) + ' ' + ids[idxStart])
                        idxStart += 1
                    # reset task list:
                    tasks = []

    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main())
Ich habe im englisch-sprachigen Python-Forum nachgefragt, ob dieses Vorgehen sicher ist und man sich auf die Reihenfolge verlassen kann aber keine Antwort auf diese Frage bekommen.
 

canju

Erfahrenes Mitglied
Da bist du mir zuvor gekommen. Ich habe mir nochmal meinen letzten Stand vor meinem Urlaub angeschaut. Ich hatte die ID sogar mit durchgteschliffen bekommen, allerdings nicht korrekt (s. weiter unten).

Daher habe ich jetzt mal einen Testcase vorbereitet der einen öffentlichen Endpunkt anspricht und die entsprechende INPUT-Tabelle sowie OUTPUT-Tabelle vorberetiet. Der Enpunkt wandelt via Post-Body-Parameter übergebenen Text einfach in Großbuchstaben um.

Postman-Collection:

JSON:
{
    "info": {
        "_postman_id": "0abee74f-1d53-4202-a2be-777b9e55b412",
        "name": "python_async_test",
        "schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json"
    },
    "item": [
        {
            "name": "ShoutOut",
            "request": {
                "method": "POST",
                "header": [
                    {
                        "key": "Content-Type",
                        "value": "application/json"
                    }
                ],
                "body": {
                    "mode": "raw",
                    "raw": "{\"INPUT\": \"hello world\"}"
                },
                "url": {
                    "raw": "HTTP://API.SHOUTCLOUD.IO/V1/SHOUT",
                    "protocol": "HTTP",
                    "host": [
                        "API",
                        "SHOUTCLOUD",
                        "IO"
                    ],
                    "path": [
                        "V1",
                        "SHOUT"
                    ]
                }
            },
            "response": []
        }
    ]
}

INPUT-Tabelle erstellen (fruits_input) und Werte einfügen - hier ist die ID enthalten, die ich später der Antwort zuordnen will:

SQL:
CREATE TABLE IF NOT EXISTS `fruits_input` (
  `fruit_id` int(11) DEFAULT NULL,
  `fruit_name_input` text COLLATE utf8mb4_unicode_ci DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;


INSERT INTO `fruits_input` (`fruit_id`, `fruit_name_input`) VALUES
    (1, 'ananas'),
    (2, 'apfel'),
    (3, 'aprikose'),
    (4, 'artischocke'),
    (5, 'auberginen'),
    (6, 'avocado '),
    (7, 'banane'),
    (8, 'beeren'),
    (9, 'birnen'),
    (10, 'blumenkohl'),
    (11, 'brokkoli '),
    (12, 'chinakohl '),
    (13, 'grünkohl'),
    (14, 'gurke '),
    (15, 'hülsenfrüchte '),
    (16, 'kartoffeln '),
    (17, 'kirschen'),
    (18, 'knoblauch'),
    (19, 'knollengemüse'),
    (20, 'kohlgemüse '),
    (21, 'lauch '),
    (22, 'melone'),
    (23, 'möhren '),
    (24, 'nüsse '),
    (25, 'oliven '),
    (26, 'paprika'),
    (27, 'pastinaken'),
    (28, 'petersilienwurzeln '),
    (29, 'pfirsiche'),
    (30, 'pflaume'),
    (31, 'pilze '),
    (32, 'rhabarber'),
    (33, 'rosenkohl'),
    (34, 'rotkohl '),
    (35, 'salat'),
    (36, 'schwarzwurzeln'),
    (37, 'sellerie'),
    (38, 'spargel'),
    (39, 'spinat'),
    (40, 'stängelgemüse'),
    (41, 'steckrüben '),
    (42, 'tomate'),
    (43, 'topinambur '),
    (44, 'weintraube'),
    (45, 'weißkohl'),
    (46, 'wirsing '),
    (47, 'zitrusfrüchte'),
    (48, 'zucchini'),
    (49, 'zwiebel ');


OUTPUT-Tabelle erstellen (fruits_output) - Also dei Tabelle wo die Antwort inkl. der ID espeichert werden soll:
SQL:
CREATE TABLE `fruits_output` (
    `fruit_id` INT(11) NULL DEFAULT NULL,
    `fruit_name_output` TEXT NULL DEFAULT NULL COLLATE 'utf8mb4_unicode_ci'
)
COLLATE='utf8mb4_unicode_ci'
ENGINE=InnoDB
;

Python-Code:

db_config.py

Python:
import mysql.connector

try:
    connection = mysql.connector.connect(host='DB_HOST',
                                         database='DB_NAME',
                                         user='DB_USERNAME',
                                         password='DB_PASSWORD')

except mysql.connector.Error as e:
    print("Error reading data from MySQL table", e)


fruits_test.py - Der Programmcode

Python:
import db_config

from datetime import datetime
import asyncio
import aiohttp
import json
import os

query_table = "fruits_input"
destination_table = "fruits_output"

# TRUNCATE TABE BEFORE INSERTING NEW VALUES
truncate_destination_table = f'''TRUNCATE TABLE {destination_table}'''
db_config.cursor = db_config.connection.cursor(dictionary=True)
db_config.cursor.execute(truncate_destination_table)
db_config.connection.commit()

# QUERY REQUIRED REQUEST PARAMTERS
sql_select_Query = f'''SELECT
                        fruit_id,
                        fruit_name_input
                    FROM {query_table} LIMIT 10'''
db_config.cursor.execute(sql_select_Query)
# get all records
records = db_config.cursor.fetchall()


url = "HTTP://API.SHOUTCLOUD.IO/V1/SHOUT"
headers = {
  'Content-Type': 'application/json'
}

for row in records:
    body = {
        "INPUT": row['fruit_name_input']
    }


async def gather_with_concurrency(n, *tasks):
    semaphore = asyncio.Semaphore(n)
    async def sem_task(task):
        async with semaphore:
            return await task

    return await asyncio.gather(*(sem_task(task) for task in tasks))


async def post_async(url, session, headers, body):
    async with session.post(url, headers=headers, json=body) as response:
        text = await response.text()
        return json.loads(text)

async def main():
    conn = aiohttp.TCPConnector(limit=None)
    session = aiohttp.ClientSession(connector=conn)

    conc_req = 3
    response = await gather_with_concurrency(conc_req, *[post_async(url, session, headers, body) for row in records])
    await session.close()

    print(json.dumps(response, indent=4)) # convert in pretty json

    fruits = response
    for fruit in fruits:

        sql = f'''INSERT INTO {destination_table} (fruit_id, fruit_name_output)
                    VALUES (%s, %s)
        '''
        val = (
            str(row['fruit_id']),
            str(fruit['OUTPUT']),
        )

        db_config.cursor.execute(sql, val)
        db_config.connection.commit()

asyncio.get_event_loop().run_until_complete(main())

if db_config.connection.is_connected():
    db_config.connection.close()
    db_config.cursor.close()
    print("\n \n MySQL connection is closed")

Das gewünschte Ergebnis:

Der fruit_name_output soll in Großbuchstaben mit der dazugehörigen fruit_id aus dem Input in der Tabelle fruits_output gepseichert werden.

Wenn ihr das so abschickt, erhaltet ihr in der Output-Tabelle 10x den gleichen Eintrag:
Code:
fruit_id;fruit_name_output
10;BLUMENKOHL
10;BLUMENKOHL
10;BLUMENKOHL
10;BLUMENKOHL
10;BLUMENKOHL
10;BLUMENKOHL
10;BLUMENKOHL
10;BLUMENKOHL
10;BLUMENKOHL
10;BLUMENKOHL

Ich vermute daher, dass meine Schleifen nicht korrekt sind. Also die Body-Parameter nicht für jeden Eintrag aus der INPUT-Tabelle abgeschickt werden, sondern einfach 10x der gleiche.
 

Sempervivum

Erfahrenes Mitglied
die Body-Parameter nicht für jeden Eintrag aus der INPUT-Tabelle abgeschickt werden, sondern einfach 10x der gleiche.
Ich denke, genau so ist es. Wenn ich den Code so ändere, funktioniert es:
Code:
import db_config

from datetime import datetime
import asyncio
import aiohttp
import json
import os

query_table = "fruits_input"
destination_table = "fruits_output"

# TRUNCATE TABE BEFORE INSERTING NEW VALUES
truncate_destination_table = f'''TRUNCATE TABLE {destination_table}'''
db_config.cursor = db_config.connection.cursor(dictionary=True)
db_config.cursor.execute(truncate_destination_table)
db_config.connection.commit()

# QUERY REQUIRED REQUEST PARAMTERS
sql_select_Query = f'''SELECT
                        fruit_id,
                        fruit_name_input
                    FROM {query_table} LIMIT 10'''
db_config.cursor.execute(sql_select_Query)
# get all records
records = db_config.cursor.fetchall()


url = r'http://localhost//_python/async-requests/fruits/the-api-2.php'
headers = {
    'Content-Type': 'application/json'
}


# Die folgende Schleife wird sofort durchlaufen und am Ende
# enthält body den Fruchtnamen aus der letzten Zeile:
# for row in records:
#     body = {
#         "INPUT": row['fruit_name_input']
#     }


async def gather_with_concurrency(n, *tasks):
    semaphore = asyncio.Semaphore(n)

    async def sem_task(task):
        async with semaphore:
            return await task

    return await asyncio.gather(*(sem_task(task) for task in tasks))

# Wir brauchen den Fruchtnamen aus der Datenbank,
# daher sehen wir die DB-Zeile als Parameter vor:


async def post_async(url, session, row):
    # body mit Fruchtname bereit stellen:
    body = {
        "INPUT": row['fruit_name_input']
    }
    async with session.post(url, headers=headers, json=body) as response:
        text = await response.text()
        return json.loads(text)


async def main():
    conn = aiohttp.TCPConnector(limit=None)
    session = aiohttp.ClientSession(connector=conn)

    conc_req = 3
    response = await gather_with_concurrency(conc_req, *[post_async(url, session, row) for row in records])
    await session.close()

    print(json.dumps(response, indent=4))  # convert in pretty json

    fruits = response
    # Wir gehen davon aus dass, obwohl die Anfragen asynchron ablaufen,
    # die richtige Reihenfolge der Antworten sicher gestellt ist:
    idx = 0
    for fruit in fruits:

        sql = f'''INSERT INTO {destination_table} (fruit_id, fruit_name_output)
                    VALUES (%s, %s)
        '''
        val = (
            # ID aus dem Record aus der Datenbank:
            str(records[idx]['fruit_id']),
            # Fruchtname aus der Antwort vom Server:
            str(fruit['OUTPUT']),
        )
        idx += 1

        db_config.cursor.execute(sql, val)
        db_config.connection.commit()

asyncio.get_event_loop().run_until_complete(main())

if db_config.connection.is_connected():
    db_config.connection.close()
    db_config.cursor.close()
    print("\n \n MySQL connection is closed")
Ich habe Kommentare in den Code eingetragen, damit ersichtlich ist, was ich gemacht habe.
Dieses:
Code:
response = await gather_with_concurrency(conc_req, *[post_async(url, session, row) for row in records])
wäre u. U. so schöner als den ganzen Datensatz zu übergeben:
Code:
response = await gather_with_concurrency(conc_req, *[post_async(url, session, row['fruit_name_input']) for row in records])
wobei natürlich die Funktion post_async angepasst werden müsste.
 

Sempervivum

Erfahrenes Mitglied
Oder alternativ die ID durch die Funktion post_async durchschleifen:
Code:
async def post_async(url, session, row):
    # body mit Fruchtname bereit stellen:
    body = {
        "INPUT": row['fruit_name_input']
    }
    async with session.post(url, headers=headers, json=body) as response:
        text = await response.text()
        decoded = json.loads(text)
        # ID in dekodierte Serverantwort eintragen:
        decoded['fruit_id'] = row['fruit_id']
        return decoded
Code:
    for fruit in fruits:

        sql = f'''INSERT INTO {destination_table} (fruit_id, fruit_name_output)
                    VALUES (%s, %s)
        '''
        val = (
            # ID aus der Serverantwort, sie wurde
            # beim Empfang dort hinzu gefügt:
            str(fruit['fruit_id']),
            # Fruchtname aus der Antwort vom Server:
            str(fruit['OUTPUT']),
        )
        db_config.cursor.execute(sql, val)
        db_config.connection.commit()