Skip to content

Module: node

Module containing the NodeClient class, which is a client for the Infernet Node's REST API.

Example Usage

You can use the Node client as follows:

from infernet_client import NodeClient

client = NodeClient("http://localhost:8000")

# Check the node's health
client.health()

# Get information about the node
client.get_info()

NodeClient

Source code in src/infernet_client/node.py
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
class NodeClient:
    def __init__(self, base_url: str):
        """Initializes the client

        Args:
            base_url (str): The base URL of the REST server

        """
        self.base_url = base_url

    async def health(self, timeout: int = 1) -> bool:
        """Server health check

        Args:
            timeout (int, optional): The timeout for the health check. Defaults to 1.

        Returns:
            bool: True if the server is healthy, False otherwise

        Raises:
            aiohttp.ClientResponseError: If the health check returns an error code
            aiohttp.TimeoutError: If the health check times out
        """

        url = f"{self.base_url}/health"
        async with ClientSession() as session:
            async with session.get(
                url, timeout=ClientTimeout(total=timeout)
            ) as response:
                response.raise_for_status()
                body = cast(HealthInfo, await response.json())
                return body["status"] == "healthy"

    async def get_info(self, timeout: int = 1) -> NodeInfo:
        """Retrieves node info

        Fetches containers running on this node, the number of jobs pending, and chain
        information.

        Args:
            timeout (int, optional): The timeout for the request. Defaults to 1.

        Returns:
            NodeInfo: The node info object

        Raises:
            aiohttp.ClientResponseError: If the request returns an error code
            aiohttp.TimeoutError: If the request times out
        """

        url = f"{self.base_url}/info"
        async with ClientSession() as session:
            async with session.get(
                url, timeout=ClientTimeout(total=timeout)
            ) as response:
                response.raise_for_status()
                return cast(NodeInfo, await response.json())

    async def get_resources(self, timeout: int = 1) -> dict[str, ServiceResources]:
        """Collects container resources on the node

        Returns:
            dict[str, ServiceResources]: Returns full resources of each container.
            See infernet-ml.docs.ritual.net/reference/infernet_ml/utils/spec/#infernet_ml.utils.spec.ServiceResources for details.

        Raises:
            aiohttp.ClientResponseError: If the request returns an error code
            aiohttp.TimeoutError: If the request times out
        """  # noqa: E501

        url = f"{self.base_url}/resources"
        async with ClientSession() as session:
            async with session.get(
                url, timeout=ClientTimeout(total=timeout)
            ) as response:
                response.raise_for_status()
                return cast(dict[str, ServiceResources], await response.json())

    async def check_model_support(
        self, model_id: str, timeout: int = 1
    ) -> dict[str, ModelSupport]:
        """Checks model support on each container

        Args:
            model_id (str): The ID of the model to check support for
            timeout (int, optional): The timeout for the request. Defaults to 1.

        Returns:
            dict[str, ModelSupport]: Returns support of the model by container.

        Raises:
            aiohttp.ClientResponseError: If the request returns an error code
            aiohttp.TimeoutError: If the request times out
        """

        url = f"{self.base_url}/resources?model_id={model_id}"
        async with ClientSession() as session:
            async with session.get(
                url, timeout=ClientTimeout(total=timeout)
            ) as response:
                response.raise_for_status()
                return cast(dict[str, ModelSupport], await response.json())

    async def request_job(self, job: JobRequest, timeout: int = 5) -> JobID:
        """Requests an asynchronous job

        Returns the job ID if the request is successful. Otherwise, raises an exception.
        Job status and results can be retrieved asynchronously using the job ID.

        Args:
            job (JobRequest): The job request
            timeout (int, optional): The timeout for the request. Defaults to 5.

        Returns:
            JobID: The ID of the job

        Raises:
            aiohttp.ClientResponseError: If the request returns an error code
            aiohttp.TimeoutError: If the request times out
        """

        url = f"{self.base_url}/api/jobs"
        async with ClientSession() as session:
            async with session.post(
                url,
                json=job,
                timeout=ClientTimeout(total=timeout),
            ) as response:
                body = await response.json()
                try:
                    response.raise_for_status()
                    return cast(JobID, body["id"])
                except ClientResponseError as e:
                    raise APIError(
                        e.status,
                        body.get("error", "Unknown error"),
                        body.get("params", None),
                    ) from e

    async def request_jobs(
        self, jobs: list[JobRequest], timeout: int = 10
    ) -> list[Union[JobResponse, ErrorResponse]]:
        """Requests asynchronous jobs in batch

        For each job request, the server returns a job ID if the request is successful,
        or an error response otherwise. Job status and results can be retrieved
        asynchronously using the job ID.

        !!! note
            The order of the responses corresponds to the order of the job requests. It
            is the responsibility of the caller to match the responses with the requests,
            and handle errors appropriately.

        Args:
            jobs (list[JobRequest]): The list of job requests
            timeout (int, optional): The timeout for the request. Defaults to 10.

        Returns:
            list[Union[JobResponse, ErrorResponse]]: The list of job IDs or error
            responses for each job request.

        Raises:
            aiohttp.ClientResponseError: If the request returns an error code
            aiohttp.TimeoutError: If the request times out
        """

        url = f"{self.base_url}/api/jobs/batch"
        async with ClientSession() as session:
            async with session.post(
                url,
                json=jobs,
                timeout=ClientTimeout(total=timeout),
            ) as response:
                body = await response.json()
                try:
                    response.raise_for_status()
                    return cast(list[Union[JobResponse, ErrorResponse]], body)
                except ClientResponseError as e:
                    raise APIError(
                        e.status,
                        body.get("error", "Unknown error"),
                        body.get("params", None),
                    ) from e

    async def get_job_result_sync(
        self, job_id: JobID, retries: int = 5, timeout: int = 5
    ) -> Optional[JobResult]:
        """Retrieves job result synchronously

        Repeatedly polls the server for the job result until the job is no longer
        running or the maximum number of retries is reached.

        Args:
            job_id (JobID): The job ID
            retries (int, optional): The number of retries if the job is still running.
                Each retry waits for 1 second before polling again. Defaults to 5.
            timeout (int, optional): The timeout for the request. Defaults to 5.

        Returns:
            Optional[JobResult]: The job result, or None if the job is not found

        Raises:
            APIError: If the job status is "failed" or the request returns an error code
            aiohttp.TimeoutError: If the request times out
            TimeoutError: If the job result is not available after the maximum number of
                retries
        """

        status = "running"
        for _ in range(retries):
            job = await self.get_job_results([job_id], timeout=timeout)

            # If the job is not found, return None
            if len(job) == 0:
                return None

            status = job[0]["status"]
            if status != "running":
                break

            # Wait for 1 second before polling again
            await sleep(1)

        if status == "running":
            raise TimeoutError(f"Job result not available after {retries} retries")

        return job[0]

    async def get_job_results(
        self, job_ids: list[JobID], intermediate: bool = False, timeout: int = 5
    ) -> list[JobResult]:
        """Retrieves job results

        Args:
            job_ids (list[JobID]): The list of job IDs
            intermediate (bool, optional): Whether to return intermediate results (only
                applicable for when multiple containers are chained). Defaults to False.
            timeout (int, optional): The timeout for the request. Defaults to 5.

        Returns:
            list[JobResult]: The list of job results

        Raises:
            aiohttp.ClientResponseError: If the request returns an error code
            aiohttp.TimeoutError: If the request times out
        """

        url = f"{self.base_url}/api/jobs?id={'&id='.join(job_ids)}"
        if intermediate:
            url += "&intermediate=true"
        async with ClientSession() as session:
            async with session.get(
                url, timeout=ClientTimeout(total=timeout)
            ) as response:
                body = await response.json()
                try:
                    response.raise_for_status()
                    return cast(list[JobResult], body)
                except ClientResponseError as e:
                    raise APIError(
                        e.status,
                        body.get("error", "Unknown error"),
                        body.get("params", None),
                    ) from e

    async def get_jobs(
        self, pending: Optional[bool] = None, timeout: int = 5
    ) -> list[JobID]:
        """Retrieves a list of job IDs for this client

        Args:
            pending (Optional[bool], optional): If True, only pending jobs are returned.
                If False, only completed jobs are returned. By default, all jobs are
                returned.
            timeout (int, optional): The timeout for the request. Defaults to 5.

        Returns:
            list[JobID]: The list of job IDs

        Raises:
            aiohttp.ClientResponseError: If the request returns an error code
            aiohttp.TimeoutError: If the request times out
        """

        url = f"{self.base_url}/api/jobs"
        if pending is not None:
            url += f"?pending={str(pending).lower()}"
        async with ClientSession() as session:
            async with session.get(
                url, timeout=ClientTimeout(total=timeout)
            ) as response:
                response.raise_for_status()
                return cast(list[JobID], await response.json())

    async def request_stream(
        self, job: JobRequest, timeout: int = 180
    ) -> AsyncGenerator[Union[str, bytes], None]:
        """Requests a streaming job

        Args:
            job (JobRequest): The streaming job request
            timeout (int, optional): The timeout for the request. Since this is a
                synchronous request, the timeout is the maximum time to wait for the
                server to finish streaming the response. Defaults to 180.

        Yields:
            Union[str, bytes]: The job ID followed by the output of the job

        Raises:
            aiohttp.ClientResponseError: If the request returns an error code
            aiohttp.TimeoutError: If the request times out
        """

        url = f"{self.base_url}/api/jobs/stream"
        async with ClientSession() as session:
            async with session.post(
                url,
                json=job,
                timeout=ClientTimeout(total=timeout),
            ) as response:
                if response.status == 200:
                    # The first line of the response is the job ID
                    job_id: Optional[str] = None
                    async for chunk in response.content.iter_any():
                        if not job_id:
                            job_id = chunk.decode("utf-8").strip()
                            yield job_id
                        else:
                            yield chunk
                else:
                    body = await response.json()
                    raise APIError(
                        response.status,
                        body.get("error", "Unknown error"),
                        body.get("params", None),
                    )

    async def record_status(
        self, id: JobID, status: JobStatus, job: JobRequest, timeout: int = 5
    ) -> None:
        """Manually records the status of a job with the node.

        NOTE: DO NOT USE THIS FUNCTION IF YOU DON'T KNOW WHAT YOU'RE DOING.

        Args:
            id (JobID): The ID of the job
            status (JobStatus): The status of the job
            job (JobRequest): The job request
            timeout (int, optional): The timeout for the request. Defaults to 5.

        Raises:
            Exception: If the job status cannot be recorded
        """

        url = f"{self.base_url}/api/status"
        async with ClientSession() as session:
            async with session.put(
                url,
                json={
                    "id": id,
                    "status": status,
                    **job,
                },
                timeout=ClientTimeout(total=timeout),
            ) as response:
                body = await response.json()
                try:
                    response.raise_for_status()
                except ClientResponseError as e:
                    raise APIError(
                        e.status,
                        body.get("error", "Unknown error"),
                        body.get("params", None),
                    ) from e

    async def request_delegated_subscription(
        self,
        subscription: Subscription,
        rpc: RPC,
        coordinator_address: ChecksumAddress,
        expiry: int,
        nonce: int,
        private_key: str,
        data: dict[str, Any],
        timeout: int = 5,
    ) -> None:
        """Creates a new delegated subscription

        Args:
            subscription (Subscription): The subscription object
            rpc (RPC): The RPC client
            coordinator_address (ChecksumAddress): The coordinator contract address
            expiry (int): The expiry of the subscription, in seconds (UNIX timestamp)
            nonce (int): The nonce of the subscription signing
            private_key (str): The private key of the subscriber
            data (dict[str, Any]): The input data for the first container
            timeout (int, optional): The timeout for the request. Defaults to 5.

        Raises:
            APIError: If the request returns an error code
        """

        chain_id = await rpc.get_chain_id()

        typed_data = subscription.get_delegate_subscription_typed_data(
            nonce,
            expiry,
            chain_id,
            coordinator_address,
        )
        signed_message = Account.sign_message(typed_data, private_key)

        url = f"{self.base_url}/api/jobs"
        async with ClientSession() as session:
            async with session.post(
                url,
                json={
                    "signature": {
                        "nonce": nonce,
                        "expiry": expiry,
                        "v": signed_message.v,
                        "r": int(signed_message.r),
                        "s": int(signed_message.s),
                    },
                    "subscription": subscription.serialized,
                    "data": data,
                },
                timeout=ClientTimeout(total=timeout),
            ) as response:
                body = await response.json()
                try:
                    response.raise_for_status()
                except ClientResponseError as e:
                    raise APIError(
                        e.status,
                        body.get("error", "Unknown error"),
                        body.get("params", None),
                    ) from e

__init__(base_url)

Initializes the client

Parameters:

Name Type Description Default
base_url str

The base URL of the REST server

required
Source code in src/infernet_client/node.py
def __init__(self, base_url: str):
    """Initializes the client

    Args:
        base_url (str): The base URL of the REST server

    """
    self.base_url = base_url

check_model_support(model_id, timeout=1) async

Checks model support on each container

Parameters:

Name Type Description Default
model_id str

The ID of the model to check support for

required
timeout int

The timeout for the request. Defaults to 1.

1

Returns:

Type Description
dict[str, ModelSupport]

dict[str, ModelSupport]: Returns support of the model by container.

Raises:

Type Description
ClientResponseError

If the request returns an error code

TimeoutError

If the request times out

Source code in src/infernet_client/node.py
async def check_model_support(
    self, model_id: str, timeout: int = 1
) -> dict[str, ModelSupport]:
    """Checks model support on each container

    Args:
        model_id (str): The ID of the model to check support for
        timeout (int, optional): The timeout for the request. Defaults to 1.

    Returns:
        dict[str, ModelSupport]: Returns support of the model by container.

    Raises:
        aiohttp.ClientResponseError: If the request returns an error code
        aiohttp.TimeoutError: If the request times out
    """

    url = f"{self.base_url}/resources?model_id={model_id}"
    async with ClientSession() as session:
        async with session.get(
            url, timeout=ClientTimeout(total=timeout)
        ) as response:
            response.raise_for_status()
            return cast(dict[str, ModelSupport], await response.json())

get_info(timeout=1) async

Retrieves node info

Fetches containers running on this node, the number of jobs pending, and chain information.

Parameters:

Name Type Description Default
timeout int

The timeout for the request. Defaults to 1.

1

Returns:

Name Type Description
NodeInfo NodeInfo

The node info object

Raises:

Type Description
ClientResponseError

If the request returns an error code

TimeoutError

If the request times out

Source code in src/infernet_client/node.py
async def get_info(self, timeout: int = 1) -> NodeInfo:
    """Retrieves node info

    Fetches containers running on this node, the number of jobs pending, and chain
    information.

    Args:
        timeout (int, optional): The timeout for the request. Defaults to 1.

    Returns:
        NodeInfo: The node info object

    Raises:
        aiohttp.ClientResponseError: If the request returns an error code
        aiohttp.TimeoutError: If the request times out
    """

    url = f"{self.base_url}/info"
    async with ClientSession() as session:
        async with session.get(
            url, timeout=ClientTimeout(total=timeout)
        ) as response:
            response.raise_for_status()
            return cast(NodeInfo, await response.json())

get_job_result_sync(job_id, retries=5, timeout=5) async

Retrieves job result synchronously

Repeatedly polls the server for the job result until the job is no longer running or the maximum number of retries is reached.

Parameters:

Name Type Description Default
job_id JobID

The job ID

required
retries int

The number of retries if the job is still running. Each retry waits for 1 second before polling again. Defaults to 5.

5
timeout int

The timeout for the request. Defaults to 5.

5

Returns:

Type Description
Optional[JobResult]

Optional[JobResult]: The job result, or None if the job is not found

Raises:

Type Description
APIError

If the job status is "failed" or the request returns an error code

TimeoutError

If the request times out

TimeoutError

If the job result is not available after the maximum number of retries

Source code in src/infernet_client/node.py
async def get_job_result_sync(
    self, job_id: JobID, retries: int = 5, timeout: int = 5
) -> Optional[JobResult]:
    """Retrieves job result synchronously

    Repeatedly polls the server for the job result until the job is no longer
    running or the maximum number of retries is reached.

    Args:
        job_id (JobID): The job ID
        retries (int, optional): The number of retries if the job is still running.
            Each retry waits for 1 second before polling again. Defaults to 5.
        timeout (int, optional): The timeout for the request. Defaults to 5.

    Returns:
        Optional[JobResult]: The job result, or None if the job is not found

    Raises:
        APIError: If the job status is "failed" or the request returns an error code
        aiohttp.TimeoutError: If the request times out
        TimeoutError: If the job result is not available after the maximum number of
            retries
    """

    status = "running"
    for _ in range(retries):
        job = await self.get_job_results([job_id], timeout=timeout)

        # If the job is not found, return None
        if len(job) == 0:
            return None

        status = job[0]["status"]
        if status != "running":
            break

        # Wait for 1 second before polling again
        await sleep(1)

    if status == "running":
        raise TimeoutError(f"Job result not available after {retries} retries")

    return job[0]

get_job_results(job_ids, intermediate=False, timeout=5) async

Retrieves job results

Parameters:

Name Type Description Default
job_ids list[JobID]

The list of job IDs

required
intermediate bool

Whether to return intermediate results (only applicable for when multiple containers are chained). Defaults to False.

False
timeout int

The timeout for the request. Defaults to 5.

5

Returns:

Type Description
list[JobResult]

list[JobResult]: The list of job results

Raises:

Type Description
ClientResponseError

If the request returns an error code

TimeoutError

If the request times out

Source code in src/infernet_client/node.py
async def get_job_results(
    self, job_ids: list[JobID], intermediate: bool = False, timeout: int = 5
) -> list[JobResult]:
    """Retrieves job results

    Args:
        job_ids (list[JobID]): The list of job IDs
        intermediate (bool, optional): Whether to return intermediate results (only
            applicable for when multiple containers are chained). Defaults to False.
        timeout (int, optional): The timeout for the request. Defaults to 5.

    Returns:
        list[JobResult]: The list of job results

    Raises:
        aiohttp.ClientResponseError: If the request returns an error code
        aiohttp.TimeoutError: If the request times out
    """

    url = f"{self.base_url}/api/jobs?id={'&id='.join(job_ids)}"
    if intermediate:
        url += "&intermediate=true"
    async with ClientSession() as session:
        async with session.get(
            url, timeout=ClientTimeout(total=timeout)
        ) as response:
            body = await response.json()
            try:
                response.raise_for_status()
                return cast(list[JobResult], body)
            except ClientResponseError as e:
                raise APIError(
                    e.status,
                    body.get("error", "Unknown error"),
                    body.get("params", None),
                ) from e

get_jobs(pending=None, timeout=5) async

Retrieves a list of job IDs for this client

Parameters:

Name Type Description Default
pending Optional[bool]

If True, only pending jobs are returned. If False, only completed jobs are returned. By default, all jobs are returned.

None
timeout int

The timeout for the request. Defaults to 5.

5

Returns:

Type Description
list[JobID]

list[JobID]: The list of job IDs

Raises:

Type Description
ClientResponseError

If the request returns an error code

TimeoutError

If the request times out

Source code in src/infernet_client/node.py
async def get_jobs(
    self, pending: Optional[bool] = None, timeout: int = 5
) -> list[JobID]:
    """Retrieves a list of job IDs for this client

    Args:
        pending (Optional[bool], optional): If True, only pending jobs are returned.
            If False, only completed jobs are returned. By default, all jobs are
            returned.
        timeout (int, optional): The timeout for the request. Defaults to 5.

    Returns:
        list[JobID]: The list of job IDs

    Raises:
        aiohttp.ClientResponseError: If the request returns an error code
        aiohttp.TimeoutError: If the request times out
    """

    url = f"{self.base_url}/api/jobs"
    if pending is not None:
        url += f"?pending={str(pending).lower()}"
    async with ClientSession() as session:
        async with session.get(
            url, timeout=ClientTimeout(total=timeout)
        ) as response:
            response.raise_for_status()
            return cast(list[JobID], await response.json())

get_resources(timeout=1) async

Collects container resources on the node

Returns:

Type Description
dict[str, ServiceResources]

dict[str, ServiceResources]: Returns full resources of each container.

dict[str, ServiceResources]

See infernet-ml.docs.ritual.net/reference/infernet_ml/utils/spec/#infernet_ml.utils.spec.ServiceResources for details.

Raises:

Type Description
ClientResponseError

If the request returns an error code

TimeoutError

If the request times out

Source code in src/infernet_client/node.py
async def get_resources(self, timeout: int = 1) -> dict[str, ServiceResources]:
    """Collects container resources on the node

    Returns:
        dict[str, ServiceResources]: Returns full resources of each container.
        See infernet-ml.docs.ritual.net/reference/infernet_ml/utils/spec/#infernet_ml.utils.spec.ServiceResources for details.

    Raises:
        aiohttp.ClientResponseError: If the request returns an error code
        aiohttp.TimeoutError: If the request times out
    """  # noqa: E501

    url = f"{self.base_url}/resources"
    async with ClientSession() as session:
        async with session.get(
            url, timeout=ClientTimeout(total=timeout)
        ) as response:
            response.raise_for_status()
            return cast(dict[str, ServiceResources], await response.json())

health(timeout=1) async

Server health check

Parameters:

Name Type Description Default
timeout int

The timeout for the health check. Defaults to 1.

1

Returns:

Name Type Description
bool bool

True if the server is healthy, False otherwise

Raises:

Type Description
ClientResponseError

If the health check returns an error code

TimeoutError

If the health check times out

Source code in src/infernet_client/node.py
async def health(self, timeout: int = 1) -> bool:
    """Server health check

    Args:
        timeout (int, optional): The timeout for the health check. Defaults to 1.

    Returns:
        bool: True if the server is healthy, False otherwise

    Raises:
        aiohttp.ClientResponseError: If the health check returns an error code
        aiohttp.TimeoutError: If the health check times out
    """

    url = f"{self.base_url}/health"
    async with ClientSession() as session:
        async with session.get(
            url, timeout=ClientTimeout(total=timeout)
        ) as response:
            response.raise_for_status()
            body = cast(HealthInfo, await response.json())
            return body["status"] == "healthy"

record_status(id, status, job, timeout=5) async

Manually records the status of a job with the node.

NOTE: DO NOT USE THIS FUNCTION IF YOU DON'T KNOW WHAT YOU'RE DOING.

Parameters:

Name Type Description Default
id JobID

The ID of the job

required
status JobStatus

The status of the job

required
job JobRequest

The job request

required
timeout int

The timeout for the request. Defaults to 5.

5

Raises:

Type Description
Exception

If the job status cannot be recorded

Source code in src/infernet_client/node.py
async def record_status(
    self, id: JobID, status: JobStatus, job: JobRequest, timeout: int = 5
) -> None:
    """Manually records the status of a job with the node.

    NOTE: DO NOT USE THIS FUNCTION IF YOU DON'T KNOW WHAT YOU'RE DOING.

    Args:
        id (JobID): The ID of the job
        status (JobStatus): The status of the job
        job (JobRequest): The job request
        timeout (int, optional): The timeout for the request. Defaults to 5.

    Raises:
        Exception: If the job status cannot be recorded
    """

    url = f"{self.base_url}/api/status"
    async with ClientSession() as session:
        async with session.put(
            url,
            json={
                "id": id,
                "status": status,
                **job,
            },
            timeout=ClientTimeout(total=timeout),
        ) as response:
            body = await response.json()
            try:
                response.raise_for_status()
            except ClientResponseError as e:
                raise APIError(
                    e.status,
                    body.get("error", "Unknown error"),
                    body.get("params", None),
                ) from e

request_delegated_subscription(subscription, rpc, coordinator_address, expiry, nonce, private_key, data, timeout=5) async

Creates a new delegated subscription

Parameters:

Name Type Description Default
subscription Subscription

The subscription object

required
rpc RPC

The RPC client

required
coordinator_address ChecksumAddress

The coordinator contract address

required
expiry int

The expiry of the subscription, in seconds (UNIX timestamp)

required
nonce int

The nonce of the subscription signing

required
private_key str

The private key of the subscriber

required
data dict[str, Any]

The input data for the first container

required
timeout int

The timeout for the request. Defaults to 5.

5

Raises:

Type Description
APIError

If the request returns an error code

Source code in src/infernet_client/node.py
async def request_delegated_subscription(
    self,
    subscription: Subscription,
    rpc: RPC,
    coordinator_address: ChecksumAddress,
    expiry: int,
    nonce: int,
    private_key: str,
    data: dict[str, Any],
    timeout: int = 5,
) -> None:
    """Creates a new delegated subscription

    Args:
        subscription (Subscription): The subscription object
        rpc (RPC): The RPC client
        coordinator_address (ChecksumAddress): The coordinator contract address
        expiry (int): The expiry of the subscription, in seconds (UNIX timestamp)
        nonce (int): The nonce of the subscription signing
        private_key (str): The private key of the subscriber
        data (dict[str, Any]): The input data for the first container
        timeout (int, optional): The timeout for the request. Defaults to 5.

    Raises:
        APIError: If the request returns an error code
    """

    chain_id = await rpc.get_chain_id()

    typed_data = subscription.get_delegate_subscription_typed_data(
        nonce,
        expiry,
        chain_id,
        coordinator_address,
    )
    signed_message = Account.sign_message(typed_data, private_key)

    url = f"{self.base_url}/api/jobs"
    async with ClientSession() as session:
        async with session.post(
            url,
            json={
                "signature": {
                    "nonce": nonce,
                    "expiry": expiry,
                    "v": signed_message.v,
                    "r": int(signed_message.r),
                    "s": int(signed_message.s),
                },
                "subscription": subscription.serialized,
                "data": data,
            },
            timeout=ClientTimeout(total=timeout),
        ) as response:
            body = await response.json()
            try:
                response.raise_for_status()
            except ClientResponseError as e:
                raise APIError(
                    e.status,
                    body.get("error", "Unknown error"),
                    body.get("params", None),
                ) from e

request_job(job, timeout=5) async

Requests an asynchronous job

Returns the job ID if the request is successful. Otherwise, raises an exception. Job status and results can be retrieved asynchronously using the job ID.

Parameters:

Name Type Description Default
job JobRequest

The job request

required
timeout int

The timeout for the request. Defaults to 5.

5

Returns:

Name Type Description
JobID JobID

The ID of the job

Raises:

Type Description
ClientResponseError

If the request returns an error code

TimeoutError

If the request times out

Source code in src/infernet_client/node.py
async def request_job(self, job: JobRequest, timeout: int = 5) -> JobID:
    """Requests an asynchronous job

    Returns the job ID if the request is successful. Otherwise, raises an exception.
    Job status and results can be retrieved asynchronously using the job ID.

    Args:
        job (JobRequest): The job request
        timeout (int, optional): The timeout for the request. Defaults to 5.

    Returns:
        JobID: The ID of the job

    Raises:
        aiohttp.ClientResponseError: If the request returns an error code
        aiohttp.TimeoutError: If the request times out
    """

    url = f"{self.base_url}/api/jobs"
    async with ClientSession() as session:
        async with session.post(
            url,
            json=job,
            timeout=ClientTimeout(total=timeout),
        ) as response:
            body = await response.json()
            try:
                response.raise_for_status()
                return cast(JobID, body["id"])
            except ClientResponseError as e:
                raise APIError(
                    e.status,
                    body.get("error", "Unknown error"),
                    body.get("params", None),
                ) from e

request_jobs(jobs, timeout=10) async

Requests asynchronous jobs in batch

For each job request, the server returns a job ID if the request is successful, or an error response otherwise. Job status and results can be retrieved asynchronously using the job ID.

Note

The order of the responses corresponds to the order of the job requests. It is the responsibility of the caller to match the responses with the requests, and handle errors appropriately.

Parameters:

Name Type Description Default
jobs list[JobRequest]

The list of job requests

required
timeout int

The timeout for the request. Defaults to 10.

10

Returns:

Type Description
list[Union[JobResponse, ErrorResponse]]

list[Union[JobResponse, ErrorResponse]]: The list of job IDs or error

list[Union[JobResponse, ErrorResponse]]

responses for each job request.

Raises:

Type Description
ClientResponseError

If the request returns an error code

TimeoutError

If the request times out

Source code in src/infernet_client/node.py
async def request_jobs(
    self, jobs: list[JobRequest], timeout: int = 10
) -> list[Union[JobResponse, ErrorResponse]]:
    """Requests asynchronous jobs in batch

    For each job request, the server returns a job ID if the request is successful,
    or an error response otherwise. Job status and results can be retrieved
    asynchronously using the job ID.

    !!! note
        The order of the responses corresponds to the order of the job requests. It
        is the responsibility of the caller to match the responses with the requests,
        and handle errors appropriately.

    Args:
        jobs (list[JobRequest]): The list of job requests
        timeout (int, optional): The timeout for the request. Defaults to 10.

    Returns:
        list[Union[JobResponse, ErrorResponse]]: The list of job IDs or error
        responses for each job request.

    Raises:
        aiohttp.ClientResponseError: If the request returns an error code
        aiohttp.TimeoutError: If the request times out
    """

    url = f"{self.base_url}/api/jobs/batch"
    async with ClientSession() as session:
        async with session.post(
            url,
            json=jobs,
            timeout=ClientTimeout(total=timeout),
        ) as response:
            body = await response.json()
            try:
                response.raise_for_status()
                return cast(list[Union[JobResponse, ErrorResponse]], body)
            except ClientResponseError as e:
                raise APIError(
                    e.status,
                    body.get("error", "Unknown error"),
                    body.get("params", None),
                ) from e

request_stream(job, timeout=180) async

Requests a streaming job

Parameters:

Name Type Description Default
job JobRequest

The streaming job request

required
timeout int

The timeout for the request. Since this is a synchronous request, the timeout is the maximum time to wait for the server to finish streaming the response. Defaults to 180.

180

Yields:

Type Description
AsyncGenerator[Union[str, bytes], None]

Union[str, bytes]: The job ID followed by the output of the job

Raises:

Type Description
ClientResponseError

If the request returns an error code

TimeoutError

If the request times out

Source code in src/infernet_client/node.py
async def request_stream(
    self, job: JobRequest, timeout: int = 180
) -> AsyncGenerator[Union[str, bytes], None]:
    """Requests a streaming job

    Args:
        job (JobRequest): The streaming job request
        timeout (int, optional): The timeout for the request. Since this is a
            synchronous request, the timeout is the maximum time to wait for the
            server to finish streaming the response. Defaults to 180.

    Yields:
        Union[str, bytes]: The job ID followed by the output of the job

    Raises:
        aiohttp.ClientResponseError: If the request returns an error code
        aiohttp.TimeoutError: If the request times out
    """

    url = f"{self.base_url}/api/jobs/stream"
    async with ClientSession() as session:
        async with session.post(
            url,
            json=job,
            timeout=ClientTimeout(total=timeout),
        ) as response:
            if response.status == 200:
                # The first line of the response is the job ID
                job_id: Optional[str] = None
                async for chunk in response.content.iter_any():
                    if not job_id:
                        job_id = chunk.decode("utf-8").strip()
                        yield job_id
                    else:
                        yield chunk
            else:
                body = await response.json()
                raise APIError(
                    response.status,
                    body.get("error", "Unknown error"),
                    body.get("params", None),
                )