Advanced Diameter Examples¶
This document provides advanced examples of implementing Diameter protocol interfaces, including load balancing, high availability, and vendor-specific integrations.
Load Balancing Client¶
from diameter.client import DiameterClient
from diameter.avp import AVP
import asyncio
import random
from typing import List, Dict
class LoadBalancedDiameterClient:
def __init__(self, peers: List[Dict[str, str]], realm: str):
"""
Initialize load balanced client with multiple peers
Args:
peers: List of peer configurations with host and port
realm: Local realm
"""
self.peers = peers
self.realm = realm
self.clients: Dict[str, DiameterClient] = {}
self.active_peers: List[str] = []
async def connect_all(self):
"""Establish connections to all configured peers"""
for peer in self.peers:
client = DiameterClient()
try:
await client.connect(peer['host'], peer['port'])
self.clients[peer['host']] = client
self.active_peers.append(peer['host'])
print(f"Connected to peer: {peer['host']}")
except Exception as e:
print(f"Failed to connect to {peer['host']}: {e}")
def get_next_peer(self) -> str:
"""Get next peer using round-robin selection"""
if not self.active_peers:
raise Exception("No active peers available")
# Round-robin selection
peer = self.active_peers.pop(0)
self.active_peers.append(peer)
return peer
async def send_request(self, command: str, avps: List[AVP]):
"""Send request to next available peer"""
peer = self.get_next_peer()
client = self.clients[peer]
request = client.create_request(command, avps)
return await client.send_request(request)
async def close(self):
"""Close all client connections"""
for client in self.clients.values():
await client.close()
# Usage example
async def main():
peers = [
{'host': 'peer1.example.com', 'port': 3868},
{'host': 'peer2.example.com', 'port': 3868},
{'host': 'peer3.example.com', 'port': 3868}
]
client = LoadBalancedDiameterClient(peers, 'example.com')
await client.connect_all()
# Send requests to different peers
for _ in range(5):
try:
response = await client.send_request('CCR', [
AVP('Session-Id', 'test-session'),
AVP('Origin-Host', 'client.example.com'),
AVP('Origin-Realm', 'example.com')
])
print(f"Request successful: {response.result_code}")
except Exception as e:
print(f"Request failed: {e}")
await client.close()
Session Management¶
from dataclasses import dataclass
from datetime import datetime, timedelta
import asyncio
@dataclass
class ChargingSession:
session_id: str
subscription_id: str
granted_units: int
used_units: int
expiry: datetime
state: str # ACTIVE, EXPIRED, TERMINATED
class SessionManager:
def __init__(self, client):
self.client = client
self.sessions: Dict[str, ChargingSession] = {}
async def create_session(self, subscription_id: str, requested_units: int):
"""Create new charging session"""
session_id = str(uuid.uuid4())
avps = [
AVP('Session-Id', session_id),
AVP('Origin-Host', self.client.host),
AVP('Origin-Realm', self.client.realm),
AVP('Destination-Realm', 'charging.example.com'),
AVP('Auth-Application-Id', 4),
AVP('CC-Request-Type', 1), # INITIAL_REQUEST
AVP('CC-Request-Number', 0),
AVP('Subscription-Id', [
AVP('Subscription-Id-Type', 0),
AVP('Subscription-Id-Data', subscription_id)
]),
AVP('Requested-Service-Unit', [
AVP('CC-Time', requested_units)
])
]
response = await self.client.send_request('CCR', avps)
if response.result_code == 2001:
granted_units = response.get_avp('Granted-Service-Unit')
validity = response.get_avp('Validity-Time', 3600) # Default 1 hour
session = ChargingSession(
session_id=session_id,
subscription_id=subscription_id,
granted_units=granted_units,
used_units=0,
expiry=datetime.now() + timedelta(seconds=validity),
state='ACTIVE'
)
self.sessions[session_id] = session
return session
raise DiameterError(response.result_code)
async def update_session(self, session_id: str, used_units: int):
"""Update session with used units and request more if needed"""
session = self.sessions.get(session_id)
if not session:
raise ValueError("Session not found")
session.used_units = used_units
if session.used_units >= session.granted_units * 0.8: # 80% threshold
await self._request_more_units(session)
async def _request_more_units(self, session: ChargingSession):
"""Request additional units for ongoing session"""
avps = [
AVP('Session-Id', session.session_id),
AVP('Origin-Host', self.client.host),
AVP('Origin-Realm', self.client.realm),
AVP('Destination-Realm', 'charging.example.com'),
AVP('Auth-Application-Id', 4),
AVP('CC-Request-Type', 2), # UPDATE_REQUEST
AVP('CC-Request-Number', 1),
AVP('Used-Service-Unit', [
AVP('CC-Time', session.used_units)
]),
AVP('Requested-Service-Unit', [
AVP('CC-Time', 600) # Request 10 more minutes
])
]
response = await self.client.send_request('CCR', avps)
if response.result_code == 2001:
additional_units = response.get_avp('Granted-Service-Unit')
session.granted_units += additional_units
return additional_units
raise DiameterError(response.result_code)
async def terminate_session(self, session_id: str):
"""Terminate charging session"""
session = self.sessions.get(session_id)
if not session:
raise ValueError("Session not found")
avps = [
AVP('Session-Id', session.session_id),
AVP('Origin-Host', self.client.host),
AVP('Origin-Realm', self.client.realm),
AVP('Destination-Realm', 'charging.example.com'),
AVP('Auth-Application-Id', 4),
AVP('CC-Request-Type', 3), # TERMINATION_REQUEST
AVP('CC-Request-Number', 2),
AVP('Used-Service-Unit', [
AVP('CC-Time', session.used_units)
])
]
response = await self.client.send_request('CCR', avps)
if response.result_code == 2001:
session.state = 'TERMINATED'
del self.sessions[session_id]
return True
raise DiameterError(response.result_code)
Vendor Integration Examples¶
Nokia Integration¶
class NokiaDiameterClient(BasicDiameterClient):
"""Nokia-specific Diameter client implementation"""
async def send_policy_request(self, subscriber_id: str, service_info: Dict):
"""Send policy request to Nokia PCRF"""
avps = [
AVP('Session-Id', str(uuid.uuid4())),
AVP('Origin-Host', self.host),
AVP('Origin-Realm', self.realm),
AVP('Destination-Realm', 'nokia.example.com'),
AVP('Auth-Application-Id', 16777238), # Gx
AVP('CC-Request-Type', 1),
AVP('CC-Request-Number', 0),
AVP('Subscription-Id', [
AVP('Subscription-Id-Type', 0),
AVP('Subscription-Id-Data', subscriber_id)
]),
# Nokia vendor-specific AVPs
AVP('Nokia-Service-Info', service_info, vendor_id=28458),
AVP('Nokia-Policy-Control', 1, vendor_id=28458)
]
request = self.client.create_request('CCR', avps)
return await self.client.send_request(request)
Ericsson Integration¶
class EricssonDiameterClient(BasicDiameterClient):
"""Ericsson-specific Diameter client implementation"""
async def send_charging_request(self, msisdn: str, service_id: str):
"""Send charging request to Ericsson OCS"""
avps = [
AVP('Session-Id', str(uuid.uuid4())),
AVP('Origin-Host', self.host),
AVP('Origin-Realm', self.realm),
AVP('Destination-Realm', 'ericsson.example.com'),
AVP('Auth-Application-Id', 4),
AVP('CC-Request-Type', 1),
AVP('CC-Request-Number', 0),
AVP('Subscription-Id', [
AVP('Subscription-Id-Type', 0),
AVP('Subscription-Id-Data', msisdn)
]),
# Ericsson vendor-specific AVPs
AVP('Ericsson-Service-Id', service_id, vendor_id=193),
AVP('Ericsson-Rating-Group', 1, vendor_id=193)
]
request = self.client.create_request('CCR', avps)
return await self.client.send_request(request)
Testing Examples¶
import pytest
from unittest.mock import Mock, patch
@pytest.fixture
async def diameter_client():
"""Create test client fixture"""
client = BasicDiameterClient(
host='test.example.com',
port=3868,
realm='test.com'
)
yield client
await client.close()
async def test_authentication_request(diameter_client):
"""Test authentication request handling"""
# Mock response
mock_response = Mock()
mock_response.result_code = 2001
mock_response.error_message = None
with patch.object(diameter_client.client, 'send_request',
return_value=mock_response):
result = await authenticate_user(
diameter_client,
'test_user'
)
assert result is True
async def test_credit_control_request(diameter_client):
"""Test credit control request handling"""
# Mock response with granted units
mock_response = Mock()
mock_response.result_code = 2001
mock_response.get_avp = Mock(return_value=300)
with patch.object(diameter_client.client, 'send_request',
return_value=mock_response):
units = await request_credit(
diameter_client,
'123456789'
)
assert units == 300
async def test_error_handling(diameter_client):
"""Test error handling with retries"""
# Mock response with error
mock_response = Mock()
mock_response.result_code = 3004 # TOO_BUSY
with patch.object(diameter_client.client, 'send_request',
return_value=mock_response):
with pytest.raises(DiameterError) as exc_info:
await send_request_with_retry(
diameter_client,
Mock(),
max_retries=2
)
assert exc_info.value.result_code == 3004