Python: Ergebnis von asynchronem HTTP POST Request verarbeiten

canju

Erfahrenes Mitglied
Hallo liebe Community,

ich fange gerade an Python zu lernen und versuche Daten aus einem externen API-Endpunkt abzurufen und das Ergebnis in eine csv Datei zu speichern (finales Ziel wird sein die Daten dann direkt in eine MariaDB zu importieren, aber zum Testen nutze ich erstmal eine .csv Datei).

Die Besonerheit hier ist, dass ich mehrere API-Calls (POST:HTTPS) zeitgleich asynchron aufrufen möchten, da eine synchrone Abfrage aufgrund der Datenmenge bis zu 8h dauert.


Mein Vorhaben:

1. CHECK - Datenbankverbindung aufbauen um benötigte POST-Parameter aus der DB beim API-Call zu übergeben
2. CHECK - Asynchrone HTTP POST Requests senden und ergebnis empfangen. Aktuell lasse ich drei Requests gleichzeitig abschicken (Credits). Anders als im Tutorial bleibt die Request-URL bei mir die gleiche aber der POST Body variiert bei mir. Daher hole ich mir die Post-Parameter aus einer bestehenden Datenbank.
3. HILFE! - :) Das Ergebnis aufbereiten und in eine .csv Datei speichern

Mein Code:
Python:
import db_config
import numpy

import asyncio
import aiohttp
import json

sql_select_Query = '''SELECT
                        some,
                        required,
                        parameters
                    FROM table ORDER BY created DESC LIMIT 3'''

db_config.cursor = db_config.connection.cursor(dictionary=True)
db_config.cursor.execute(sql_select_Query)
# get all records
records = db_config.cursor.fetchall()
print("Total number of rows in table: ", db_config.cursor.rowcount)

url = "https://webservice.com"
headers = {
  'Authorization': 'Bearer API_TOKEN',
  'Content-Type': 'application/json'
}

for row in records:
    body = {
      "some": row['some'],
      "required": row['required'],
      "parameters": row['parameters']
    }

file = open("response.csv", "w", encoding='utf-8')

async def gather_with_concurrency(n, *tasks):
    semaphore = asyncio.Semaphore(n)
    async def sem_task(task):
        async with semaphore:
            return await task

    return await asyncio.gather(*(sem_task(task) for task in tasks))


async def post_async(url, session, headers, body):
    async with session.post(url, headers=headers, json=body) as response:
        text = await response.text()
        return json.loads(text)

async def main():
    conn = aiohttp.TCPConnector(limit=None)
    session = aiohttp.ClientSession(connector=conn)

    conc_req = 3
    response = await gather_with_concurrency(conc_req, *[post_async(url, session, headers, body) for row in records])
    await session.close()

    #print(response)

    tariffs = json.loads(str(response))
    for tariff in tariffs:
        print(tariff['Id'])

    file.write(json.dumps(response, indent=4))

    #print(json.dumps(response, indent=4)) # convert in pretty json

asyncio.get_event_loop().run_until_complete(main())

file.close()

if db_config.connection.is_connected():
    db_config.connection.close()
    db_config.cursor.close()
    print("\n \n MySQL connection is closed")


print(response) (Zeilenümbrüche habe ich der lesbarkeit halber hinzugefügt):

Code:
[[
{"Active": true, "Id": 123, "Tariffs": [{"ContractDuation": 24.0, "Cost": {"InitialCost": 1479.8245, "MonthlyCost": 50.00}, "Valid": "2022-07-01T00:00:00"}], "Code": 54789},
{"Active": true, "Id": 124, "Tariffs":[{"ContractDuation": 12.0, "Cost":{"InitialCost": 864.1534, "MonthlyCost": 55.00}, "Valid": "2022-07-01T00:00:00"}{"ContractDuation": 24.0, "Cost":{"InitialCost": 11265.6524, "MonthlyCost": 45.00},"Valid": "2022-07-01T00:00:00"}],"CodeNumber": 62147}
]]


print(json.dumps(response, indent=4))

JSON:
[
    [
        {
            "Active": true,
            "Id": 123,
            "Tariffs":
            [
                {
                    "ContractDuation": 24.0,
                    "Cost":
                    {
                        "InitialCost": 1479.8245,
                        "MonthlyCost": 50.00
                    },
                    "Valid": "2022-07-01T00:00:00"
                }
            ],
            "Code": 54789
        },
        {
            "Active": true,
            "Id": 124,
            "Tariffs":
            [
                {
                    "ContractDuation": 12.0,
                    "Cost":
                    {
                        "InitialCost": 864.1534,
                        "MonthlyCost": 55.00
                    },
                    "Valid": "2022-07-01T00:00:00"
                }
                {
                    "ContractDuation": 24.0,
                    "Cost":
                    {
                        "InitialCost": 11265.6524,
                        "MonthlyCost": 45.00
                    },
                    "Valid": "2022-07-01T00:00:00"
                }
            ],
            "CodeNumber": 62147
        },
    ]
]

Ich versuche zunächst erstmal nur die jeweilige Id printen zu lassen, ich erhalte jedoch die Fehlermeldung:
Code:
Traceback (most recent call last):
  File "test.py", line 101, in <module>
    asyncio.get_event_loop().run_until_complete(main())
  File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "test.py", line 90, in main
    tariffs = json.loads(str(response))
  File "/usr/lib/python3.8/json/__init__.py", line 357, in loads
    return _default_decoder.decode(s)
  File "/usr/lib/python3.8/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/lib/python3.8/json/decoder.py", line 353, in raw_decode
    obj, end = self.scan_once(s, idx)
json.decoder.JSONDecodeError: Expecting property name enclosed in double quotes: line 1 column 3 (char 2)

Ich vermute irgendetwas mit der Schleife funzt nicht, bzw. versucht auf das falsche Objekt zuzugreifen?
Stehe hier auf dem Schlauch und bin dankbar für jede Hilfe.

Grüße,
canju
 
Mit asynchronen Requests in Python habe ich mich noch nicht beschäftigt, aber nach deinen Erklärungen sind die gar nicht das Problem sondern das Auswerten und Speichern in eine CSV-Datei, nicht wahr? D. h. es geht nur noch darum, dieses:
Code:
    tariffs = json.loads(str(response))
    for tariff in tariffs:
        print(tariff['Id'])

    file.write(json.dumps(response, indent=4))
so umzuändern, dass die Antwort in eine CSV-Datei geschrieben wird?
 
PS: Ich habe da noch etwas genauer hin gesehen und da sind ja zwei Arrays verschachtelt. D. h. Du müsstest es so umformulieren:
Code:
    for tariff in tariffs[0]:
        print(tariff['Id'])
 
Hey Sempervivum,
freut mich dich wieder zu "sehen", hoffe dir geht es gut.
Mit asynchronen Requests in Python habe ich mich noch nicht beschäftigt, aber nach deinen Erklärungen sind die gar nicht das Problem sondern das Auswerten und Speichern in eine CSV-Datei, nicht wahr?
Genau, das zeitgleiche absenden der HTTP-Requests scheint zu funzen. Wenn ich mir mit watch -n1 lsof -i TCP:80,443 die anzahl der ausgehenden Request anschaue, sehe ich die gewünschten 3 zeitgleichen requests. Kann ich auch jedem empfehlen, das verringert die Wartezeit bei großen Datenmengen / vielen Abfragen erheblich (Aber nicht übertreiben, um den Zielserver nicht zu "DDoSen").
Das:
Python:
    for tariff in tariffs[0]:
        print(tariff['Id'])

hatte ich in der Tat auch schon probiert und erhalte:
Code:
Traceback (most recent call last):
  File "test.py", line 101, in <module>
    asyncio.get_event_loop().run_until_complete(main())
  File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "test.py", line 90, in main
    tariffs = json.loads(str(response))
  File "/usr/lib/python3.8/json/__init__.py", line 357, in loads
    return _default_decoder.decode(s)
  File "/usr/lib/python3.8/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/lib/python3.8/json/decoder.py", line 353, in raw_decode
    obj, end = self.scan_once(s, idx)
json.decoder.JSONDecodeError: Expecting property name enclosed in double quotes: line 1 column 4 (char 3)

Randnotiz: Das str() in tariffs = json.loads(str(response)) habe ich auch erst drum gesetzt als ich immer diese Meldung erhalten hatte:

Code:
Traceback (most recent call last):
  File "test.py", line 101, in <module>
    asyncio.get_event_loop().run_until_complete(main())
  File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "test.py", line 90, in main
    tariffs = json.loads(response)
  File "/usr/lib/python3.8/json/__init__.py", line 341, in loads
    raise TypeError(f'the JSON object must be str, bytes or bytearray, '
TypeError: the JSON object must be str, bytes or bytearray, not list
 
Scheint doch ein wenig komplizierter zu sein. Die Fehlermeldungen weisen eher in die Richtung, dass schon beim Dekodieren bzw. Parsen ein Problem auftritt.

Wenn dieses:
Code:
print(json.dumps(response, indent=4))
so einwandfrei funktioniert, würde ich vermuten, dass response kein JSON-String ist, sondern ein Objekt, das schon dekodiert ist. Versuche doch mal den Typ von response ausgeben zu lassen und ob die Schleife funktioniert, wenn du das json.loads weg lässt.
 
Hast recht und mir eine Kopfschmerztablette erspart, lieben Dank :)
print(type(response)) = <class 'list'>

Die Id's werden jetzt alles richtig ausgegeben. Ich muss zugeben die Bennung der Schleife ist ein wenig verwirrend weil jetzt so oft tariff und Tariffs drin vorkommt und ich musste die Produktivdaten hier durch fiktive ersetzen, bitte verzeih mir. Ich habe aber jetzt die Schleife um eine erweitert damit ich auch alles innerhalb von Tariffs erhalte:
Python:
    tariffs = response
    for tariff in tariffs[0]:
        print(tariff['Id'])
        for Tariffs in tariff['Tariffs']:
            print(Tariffs["ContractDuation"])

Hast du noch einen Tipp, wie ich das jetzt Zeile für Zeile in die csv-Datei schreiben kann?
 
Sorry fürs verzögerte Feedback. Habe den Umweg über die csv Datei nach einigem Ausprobieren jetzt verworfen und hole die Werte direkt in die Datenbank.

Eine sache kriege ich aber nicht hin. Und zwar hole ich mir die benötigten Post-Parameter aus einer bestehenden DB:
Python:
for row in records:
    body = {
      "some": row['some'],
      "required": row['required'],
      "parameters": row['parameters']
    }
Die dann in dieser Zeile:
Python:
response = await gather_with_concurrency(conc_req, *[post_async(url, session, headers, body) for row in records])
Entsprechend beim request übergeben werden. Das funzt soweit.

Ich habe jedoch neben den erforderlichen Body-Parametern in records hier auch noch interne Referenznummern (bspw. UserID) enthalten, die ich zu jedem Ergebniss dazu benötige, um später eine Zuordnung vornehmen zu können.

Hast du eine Idee wie ich die interne Referenz mit bis zu:
Python:
    tariffs = response
    for tariff in tariffs[0]:
        print(tariff['Id'])
        for Tariffs in tariff['Tariffs']:
            print(Tariffs["ContractDuation"]) # <-- Also bis zu dieser Ebene hier

durchgeschliffen bekomme um diese dann im anschließenden INSERT mit in die DB zu schreiben?
 
Wo kommen diese IDs denn her? Wenn sie in einer globalen Variable stehen, würde ich erwarten, dass sie in jedem Fall verfügbar sind, egal wie tief Du in einer Schleife drin bist.
 
Die stammen auch aus:
Python:
sql_select_Query = '''SELECT
                        some,
                        required,
                        parameters
                    FROM table ORDER BY created DESC LIMIT 3'''

db_config.cursor = db_config.connection.cursor(dictionary=True)
db_config.cursor.execute(sql_select_Query)
# get all records
records = db_config.cursor.fetchall()

Wenn ich die schleifen erweitere um:
Python:
    tariffs = response
    for tariff in tariffs[0]:
        print(tariff['Id'])
        for Tariffs in tariff['Tariffs']:
            print(Tariffs["ContractDuation"])
            
            for row2 in records: # <--
                print(row2['user_id']) # <--

werden mir aber mehr user_ids ausgegeben, als es sein dürften. ich weiß dass ich 3 Anfragen abgeschickt habe, die mir insgesamt 92 Tarife zurückgeben (klappt soweit), jeder dieser Tarife soll mit der user_id versehen werden. Sobald ich aber die user_id ins spiel bringe erhalte ich plötzlich 192 einträge in der DB.
 
Zuletzt bearbeitet:
Zurück