Skip to main content

๐Ÿ”Œ Connection

This guide explains how to establish, manage, and close connections to the ZTeraDB Server using the core ZTeraDBConnectionAsync client engine.


๐Ÿ”Œ What is ZTeraDBConnectionAsync?โ€‹

The ZTeraDBConnectionAsync class serves as the primary asynchronous network broker for your application. It abstracts low-level non-blocking socket management and handles:

  • ๐Ÿ” Secure Handshakes: Opens and manages non-blocking TCP/TLS streams directly to the server using asyncio.
  • ๐ŸŽซ Session Auth: Handles initial token validations using your configuration keys.
  • ๐Ÿš€ High-Throughput Streaming: Executes ZQL payloads and delivers efficient asynchronous memory streams.
  • ๐Ÿ”„ Socket Reuse: Integrates directly with client-side connection pooling layers to maximize database performance.

๐Ÿง  Architectural Overviewโ€‹

ZTeraDB decouples your application from the underlying target systems by acting as a single database proxy router:


๐Ÿ“ฆ Initializing a Connectionโ€‹

The connection constructor accepts your target infrastructure endpoints along with your initialized configuration layout.

from zteradb import ZTeraDBConnectionAsync

# Signature format: ZTeraDBConnectionAsync(host: str, port: int, config: ZTeraDBConfig)
db = ZTeraDBConnectionAsync(
"db.zteradb.com",
7777,
config
)

๐Ÿ”‘ Constructor Parametersโ€‹

ParameterTypeRequiredDescription
hoststrYesThe remote endpoint or network IP address allocated to your cluster runtime. (e.g., "db1.zteradb.com")
portintYesThe active TCP entry port assigned to your instance. Defaults universally to 7777.
configZTeraDBConfigYesAn initialized, valid configuration matrix containing your authentication profile.

๐ŸŽ› Client Methodsโ€‹

  1. await db.run(query: ZTeraDBQuery) Submits an abstracted ZQL query framework directly to the cluster infrastructure socket. Because it executes over non-blocking sockets, this method must be awaited.
query = ZTeraDBQuery("user").select()
result = await db.run(query)
  • Memory Optimization: Depending on your configuration options, results can be processed as an asynchronous stream. Rows are parsed as they arrive over the wire rather than loading the entire payload block into memory at once. It is highly recommended to loop through streaming datasets via async for:
async for row in result:
print(row)
  1. await db.close() -> None Closes active streaming connections, flushes the internal connection pool, and frees up network socket descriptors on the host device.
await db.close()

๐Ÿ’ก Serverless Tip: Always explicitly invoke await db.close() at the conclusion of your script, especially inside ephemeral execution architectures (like AWS Lambda or Google Cloud Functions) to prevent connection leaks.


๐Ÿงช Complete Implementation Blueprintโ€‹

# app.py

import asyncio
import os
from dotenv import load_dotenv
from zteradb import ZTeraDBConnectionAsync, ZTeraDBQuery
from zteradb.config.zteradb_config import ZTeraDBConfig
from zteradb.config.options import Options
from zteradb.config.connection_pool import ConnectionPool
from zteradb.config.response_data_types import ResponseDataTypes
from zteradb.config.envs import ENVS

# Load local environment variables from .env file
load_dotenv()

async def main():
# 1. Configure the connection pool scaling rules
pool_config = ConnectionPool(
min_size=int(os.getenv("MIN_CONNECTION", 1)),
max_size=int(os.getenv("MAX_CONNECTION", 2))
)

# 2. Map structural options and properties
options = Options(
response_data_type=ResponseDataTypes(os.getenv("REQUEST_DATA_TYPE", "json")),
connection_pool=pool_config
)

# 3. Structural configuration extraction
config = ZTeraDBConfig(
client_key=os.getenv("CLIENT_KEY"),
access_key=os.getenv("ACCESS_KEY"),
secret_key=os.getenv("SECRET_KEY"),
database_id=os.getenv("DATABASE_ID"),
env=ENVS(os.getenv("ZTERADB_ENV", "dev")),
options=options
)

# 4. Establishing the network channel driver
db = ZTeraDBConnectionAsync(
os.getenv("ZTERADB_HOST", "db1.zteradb.com"),
int(os.getenv("ZTERADB_PORT", "7777")),
config
)

try:
# 5. Execution Pipeline
query = ZTeraDBQuery("user").select().limit(0, 10)
result = await db.run(query)

# Process the returned rows cleanly
for row in result:
print(row)

except Exception as e:
print(f"Database operation failed: {e}")

finally:
# 6. Resource Cleanup
await db.close()

if __name__ == "__main__":
asyncio.run(main())

โš ๏ธ Troubleshooting Connection Failuresโ€‹

  • โŒ Socket Exception / Timeout Errors: Usually points to incorrect network routing or network access controls blocking access.

    • Fix: Double-check your host endpoint address and make sure outbound connections on port 7777 are permitted by your local firewall or security group policies.
  • โŒ Authentication Rejections: The client can reach the server but your security handshake fails.

    • Fix: Ensure all required keys (client_key, access_key, secret_key, and database_id) are mapped properly through your environment configuration setup.
  • โŒ Resource Leakage Warning: Python warnings about unclosed event loops or high system socket metrics.

    • Fix: Wrap execution processes inside a structural try...finally block to make sure await db.close() runs regardless of processing runtime exceptions.

๐ŸŽ‰ Next Stepโ€‹

Now that your connection pipeline is established, learn how to build complex data lookups: ๐Ÿ‘‰ ZTeraDB Query Guide