Diameter Protocol Examples¶
This document provides practical examples of implementing Diameter protocol interfaces in various scenarios.
Basic Client Implementation¶
Client Setup¶
from diameter.client import DiameterClient
from diameter.avp import AVP
import asyncio
class BasicDiameterClient(DiameterClient):
def __init__(self, host, port, realm):
super().__init__(host, port)
self.realm = realm
async def connect(self):
await self.establish_connection()
await self.capability_exchange()
async def capability_exchange(self):
request = self.create_cer()
response = await self.send_request(request)
return response.result_code == 2001 # DIAMETER_SUCCESS
def create_cer(self):
avps = [
AVP('Origin-Host', self.host),
AVP('Origin-Realm', self.realm),
AVP('Host-IP-Address', self.host_ip),
AVP('Vendor-Id', 0),
AVP('Product-Name', 'Example Diameter Client'),
AVP('Supported-Vendor-Id', 10415), # 3GPP
AVP('Auth-Application-Id', 4) # Diameter Credit Control
]
return self.create_request('Capabilities-Exchange-Request', avps)
Usage Example¶
async def main():
client = BasicDiameterClient(
host='client.example.com',
port=3868,
realm='example.com'
)
try:
await client.connect()
print("Connected to Diameter server")
# Keep connection alive
while True:
await client.send_dwr()
await asyncio.sleep(30)
except Exception as e:
print(f"Error: {str(e)}")
finally:
await client.disconnect()
if __name__ == '__main__':
asyncio.run(main())
Authentication Examples¶
Authentication Request¶
async def send_auth_request(client, user_name, service_type):
avps = [
AVP('Session-Id', f"session.{uuid.uuid4()}"),
AVP('Origin-Host', client.host),
AVP('Origin-Realm', client.realm),
AVP('Destination-Realm', 'auth.example.com'),
AVP('Auth-Application-Id', 1), # NASREQ
AVP('User-Name', user_name),
AVP('Service-Type', service_type)
]
request = client.create_request('AA-Request', avps)
return await client.send_request(request)
# Usage
async def authenticate_user(client, username):
try:
response = await send_auth_request(
client=client,
user_name=username,
service_type=2 # FRAMED
)
if response.result_code == 2001:
print(f"User {username} authenticated successfully")
return True
else:
print(f"Authentication failed: {response.error_message}")
return False
except Exception as e:
print(f"Authentication error: {str(e)}")
return False
Charging Examples¶
Real-time Charging Session¶
class ChargingSession:
def __init__(self, client, subscription_id):
self.client = client
self.subscription_id = subscription_id
self.session_id = f"charging.{uuid.uuid4()}"
self.request_number = 0
async def start(self):
"""Initialize charging session"""
request = self._create_ccr('INITIAL_REQUEST')
response = await self.client.send_request(request)
if response.result_code != 2001:
raise ChargingError("Failed to start charging session")
return response.get_avp('Granted-Service-Unit')
async def update(self, used_units):
"""Update charging session with usage"""
request = self._create_ccr(
'UPDATE_REQUEST',
used_service_unit=used_units
)
response = await self.client.send_request(request)
if response.result_code != 2001:
raise ChargingError("Failed to update charging session")
return response.get_avp('Granted-Service-Unit')
async def terminate(self, used_units):
"""End charging session"""
request = self._create_ccr(
'TERMINATION_REQUEST',
used_service_unit=used_units
)
await self.client.send_request(request)
def _create_ccr(self, request_type, used_service_unit=None):
avps = [
AVP('Session-Id', self.session_id),
AVP('Origin-Host', self.client.host),
AVP('Origin-Realm', self.client.realm),
AVP('Destination-Realm', 'charging.example.com'),
AVP('Auth-Application-Id', 4),
AVP('Service-Context-Id', '[email protected]'),
AVP('CC-Request-Type', request_type),
AVP('CC-Request-Number', self.request_number),
AVP('Subscription-Id', self.subscription_id)
]
if used_service_unit is not None:
avps.append(AVP('Used-Service-Unit', used_service_unit))
self.request_number += 1
return self.client.create_request('Credit-Control-Request', avps)
Usage Monitoring¶
async def monitor_usage(client, subscription_id, total_quota):
session = ChargingSession(client, subscription_id)
used_units = 0
try:
# Start session
granted_units = await session.start()
# Monitor usage
while used_units < total_quota:
# Simulate usage
current_usage = min(granted_units, 1000)
used_units += current_usage
# Update when 80% of granted units used
if current_usage >= (granted_units * 0.8):
granted_units = await session.update(current_usage)
await asyncio.sleep(1) # Simulate time passing
# Terminate session
await session.terminate(used_units)
except ChargingError as e:
print(f"Charging error: {str(e)}")
await session.terminate(used_units)
except Exception as e:
print(f"Unexpected error: {str(e)}")
await session.terminate(used_units)
Load Balancing Example¶
class LoadBalancedDiameterClient:
def __init__(self, config):
self.peers = config['peers']
self.current_peer = 0
self.clients = {}
async def initialize(self):
"""Initialize connections to all peers"""
for peer in self.peers:
client = DiameterClient(
host=peer['host'],
port=peer['port'],
realm=peer['realm']
)
await client.connect()
self.clients[peer['host']] = client
async def send_request(self, request):
"""Send request with round-robin load balancing"""
retries = len(self.peers)
while retries > 0:
peer = self.peers[self.current_peer]
client = self.clients[peer['host']]
try:
response = await client.send_request(request)
return response
except ConnectionError:
# Try next peer
self.current_peer = (self.current_peer + 1) % len(self.peers)
retries -= 1
raise ConnectionError("All peers unavailable")
Vendor-Specific Examples¶
Nokia Integration¶
class NokiaChargingClient(DiameterClient):
def __init__(self, config):
super().__init__(
host=config['host'],
port=config['port']
)
self.vendor_id = 28458 # Nokia
def create_service_request(self, session_id, service_type):
avps = [
AVP('Session-Id', session_id),
AVP('Origin-Host', self.host),
AVP('Origin-Realm', self.realm),
AVP('Vendor-Id', self.vendor_id),
AVP('Nokia-Service-Type', service_type),
# Add other Nokia-specific AVPs
]
return self.create_request('Service-Request', avps)
Ericsson Integration¶
class EricssonChargingClient(DiameterClient):
def __init__(self, config):
super().__init__(
host=config['host'],
port=config['port']
)
self.vendor_id = 193 # Ericsson
def create_quota_request(self, session_id, quota_type):
avps = [
AVP('Session-Id', session_id),
AVP('Origin-Host', self.host),
AVP('Origin-Realm', self.realm),
AVP('Vendor-Id', self.vendor_id),
AVP('Ericsson-Quota-Type', quota_type),
# Add other Ericsson-specific AVPs
]
return self.create_request('Quota-Request', avps)
Error Handling Examples¶
Retry Mechanism¶
class RetryableClient(DiameterClient):
async def send_request_with_retry(self, request, max_retries=3):
retries = 0
backoff = 1
while retries < max_retries:
try:
return await self.send_request(request)
except ConnectionError as e:
retries += 1
if retries == max_retries:
raise
# Exponential backoff
await asyncio.sleep(backoff)
backoff *= 2
except DiameterError as e:
if e.is_permanent():
raise
retries += 1
if retries == max_retries:
raise
await asyncio.sleep(backoff)
backoff *= 2
Error Response Handling¶
class ErrorHandler:
@staticmethod
def handle_error(error):
if isinstance(error, DiameterError):
if error.result_code == 4001: # DIAMETER_AUTHENTICATION_REJECTED
return "Authentication failed"
elif error.result_code == 4010: # DIAMETER_END_USER_SERVICE_DENIED
return "Service denied"
elif error.result_code == 4012: # DIAMETER_CREDIT_LIMIT_REACHED
return "Credit limit reached"
elif error.result_code == 5001: # DIAMETER_AVP_UNSUPPORTED
return "Unsupported AVP"
elif error.result_code == 5004: # DIAMETER_INVALID_AVP_VALUE
return "Invalid AVP value"
elif error.result_code == 5012: # DIAMETER_UNABLE_TO_COMPLY
return "Unable to comply"
else:
return f"Diameter error: {error.result_code}"
else:
return f"Unexpected error: {str(error)}"
Configuration Examples¶
Basic Configuration¶
diameter:
client:
host: client.example.com
port: 3868
realm: example.com
vendor_id: 10415
product_name: Example Client
connection:
retry_count: 3
timeout: 5
keepalive: 30
Advanced Configuration¶
diameter:
client:
host: client.example.com
port: 3868
realm: example.com
vendor_id: 10415
product_name: Example Client
connection:
retry_count: 3
timeout: 5
keepalive: 30
max_pending_requests: 10000
security:
tls_enabled: true
cert_file: /etc/diameter/certs/client.pem
key_file: /etc/diameter/certs/client.key
ca_file: /etc/diameter/certs/ca.pem
peers:
- host: server1.example.com
port: 3868
realm: example.com
weight: 100
- host: server2.example.com
port: 3868
realm: example.com
weight: 100
monitoring:
metrics_enabled: true
prometheus_port: 9090
log_level: INFO
Testing Examples¶
Unit Test¶
import unittest
from unittest.mock import Mock, patch
from diameter.client import DiameterClient
class TestDiameterClient(unittest.TestCase):
def setUp(self):
self.client = DiameterClient(
host='test.example.com',
port=3868,
realm='example.com'
)
@patch('diameter.client.DiameterClient.send_request')
async def test_authentication(self, mock_send):
# Arrange
mock_response = Mock()
mock_response.result_code = 2001
mock_send.return_value = mock_response
# Act
result = await self.client.authenticate('test_user')
# Assert
self.assertTrue(result)
mock_send.assert_called_once()
@patch('diameter.client.DiameterClient.send_request')
async def test_authentication_failure(self, mock_send):
# Arrange
mock_response = Mock()
mock_response.result_code = 4001
mock_send.return_value = mock_response
# Act
result = await self.client.authenticate('test_user')
# Assert
self.assertFalse(result)
mock_send.assert_called_once()
Integration Test¶
```python import pytest from diameter.client import DiameterClient
@pytest.mark.integration class TestDiameterIntegration: @pytest.fixture async def client(self): client = DiameterClient( host='test.example.com', port=3868, realm='example.com' ) await client.connect() yield client await client.disconnect()
async def test_capability_exchange(self, client):
response = await client.capability_exchange()
assert response.result_code == 2001
async def test_authentication(self, client):
response = await client.authenticate('test_user')
assert response.result_code == 2001
async def test_charging(self, client):
session = ChargingSession(client, 'test_subscription')
granted_units = await session.start()
assert granted_units > 0
await session.update(500)
await session.terminate(1000)