Skip to content

Advanced Diameter Examples

This document provides advanced examples of implementing Diameter protocol interfaces, including load balancing, high availability, and vendor-specific integrations.

Load Balancing Client

from diameter.client import DiameterClient
from diameter.avp import AVP
import asyncio
import random
from typing import List, Dict

class LoadBalancedDiameterClient:
    def __init__(self, peers: List[Dict[str, str]], realm: str):
        """
        Initialize load balanced client with multiple peers

        Args:
            peers: List of peer configurations with host and port
            realm: Local realm
        """
        self.peers = peers
        self.realm = realm
        self.clients: Dict[str, DiameterClient] = {}
        self.active_peers: List[str] = []

    async def connect_all(self):
        """Establish connections to all configured peers"""
        for peer in self.peers:
            client = DiameterClient()
            try:
                await client.connect(peer['host'], peer['port'])
                self.clients[peer['host']] = client
                self.active_peers.append(peer['host'])
                print(f"Connected to peer: {peer['host']}")
            except Exception as e:
                print(f"Failed to connect to {peer['host']}: {e}")

    def get_next_peer(self) -> str:
        """Get next peer using round-robin selection"""
        if not self.active_peers:
            raise Exception("No active peers available")

        # Round-robin selection
        peer = self.active_peers.pop(0)
        self.active_peers.append(peer)
        return peer

    async def send_request(self, command: str, avps: List[AVP]):
        """Send request to next available peer"""
        peer = self.get_next_peer()
        client = self.clients[peer]

        request = client.create_request(command, avps)
        return await client.send_request(request)

    async def close(self):
        """Close all client connections"""
        for client in self.clients.values():
            await client.close()

# Usage example
async def main():
    peers = [
        {'host': 'peer1.example.com', 'port': 3868},
        {'host': 'peer2.example.com', 'port': 3868},
        {'host': 'peer3.example.com', 'port': 3868}
    ]

    client = LoadBalancedDiameterClient(peers, 'example.com')
    await client.connect_all()

    # Send requests to different peers
    for _ in range(5):
        try:
            response = await client.send_request('CCR', [
                AVP('Session-Id', 'test-session'),
                AVP('Origin-Host', 'client.example.com'),
                AVP('Origin-Realm', 'example.com')
            ])
            print(f"Request successful: {response.result_code}")
        except Exception as e:
            print(f"Request failed: {e}")

    await client.close()

Session Management

from dataclasses import dataclass
from datetime import datetime, timedelta
import asyncio

@dataclass
class ChargingSession:
    session_id: str
    subscription_id: str
    granted_units: int
    used_units: int
    expiry: datetime
    state: str  # ACTIVE, EXPIRED, TERMINATED

class SessionManager:
    def __init__(self, client):
        self.client = client
        self.sessions: Dict[str, ChargingSession] = {}

    async def create_session(self, subscription_id: str, requested_units: int):
        """Create new charging session"""
        session_id = str(uuid.uuid4())

        avps = [
            AVP('Session-Id', session_id),
            AVP('Origin-Host', self.client.host),
            AVP('Origin-Realm', self.client.realm),
            AVP('Destination-Realm', 'charging.example.com'),
            AVP('Auth-Application-Id', 4),
            AVP('CC-Request-Type', 1),  # INITIAL_REQUEST
            AVP('CC-Request-Number', 0),
            AVP('Subscription-Id', [
                AVP('Subscription-Id-Type', 0),
                AVP('Subscription-Id-Data', subscription_id)
            ]),
            AVP('Requested-Service-Unit', [
                AVP('CC-Time', requested_units)
            ])
        ]

        response = await self.client.send_request('CCR', avps)

        if response.result_code == 2001:
            granted_units = response.get_avp('Granted-Service-Unit')
            validity = response.get_avp('Validity-Time', 3600)  # Default 1 hour

            session = ChargingSession(
                session_id=session_id,
                subscription_id=subscription_id,
                granted_units=granted_units,
                used_units=0,
                expiry=datetime.now() + timedelta(seconds=validity),
                state='ACTIVE'
            )

            self.sessions[session_id] = session
            return session

        raise DiameterError(response.result_code)

    async def update_session(self, session_id: str, used_units: int):
        """Update session with used units and request more if needed"""
        session = self.sessions.get(session_id)
        if not session:
            raise ValueError("Session not found")

        session.used_units = used_units

        if session.used_units >= session.granted_units * 0.8:  # 80% threshold
            await self._request_more_units(session)

    async def _request_more_units(self, session: ChargingSession):
        """Request additional units for ongoing session"""
        avps = [
            AVP('Session-Id', session.session_id),
            AVP('Origin-Host', self.client.host),
            AVP('Origin-Realm', self.client.realm),
            AVP('Destination-Realm', 'charging.example.com'),
            AVP('Auth-Application-Id', 4),
            AVP('CC-Request-Type', 2),  # UPDATE_REQUEST
            AVP('CC-Request-Number', 1),
            AVP('Used-Service-Unit', [
                AVP('CC-Time', session.used_units)
            ]),
            AVP('Requested-Service-Unit', [
                AVP('CC-Time', 600)  # Request 10 more minutes
            ])
        ]

        response = await self.client.send_request('CCR', avps)

        if response.result_code == 2001:
            additional_units = response.get_avp('Granted-Service-Unit')
            session.granted_units += additional_units
            return additional_units

        raise DiameterError(response.result_code)

    async def terminate_session(self, session_id: str):
        """Terminate charging session"""
        session = self.sessions.get(session_id)
        if not session:
            raise ValueError("Session not found")

        avps = [
            AVP('Session-Id', session.session_id),
            AVP('Origin-Host', self.client.host),
            AVP('Origin-Realm', self.client.realm),
            AVP('Destination-Realm', 'charging.example.com'),
            AVP('Auth-Application-Id', 4),
            AVP('CC-Request-Type', 3),  # TERMINATION_REQUEST
            AVP('CC-Request-Number', 2),
            AVP('Used-Service-Unit', [
                AVP('CC-Time', session.used_units)
            ])
        ]

        response = await self.client.send_request('CCR', avps)

        if response.result_code == 2001:
            session.state = 'TERMINATED'
            del self.sessions[session_id]
            return True

        raise DiameterError(response.result_code)

Vendor Integration Examples

Nokia Integration

class NokiaDiameterClient(BasicDiameterClient):
    """Nokia-specific Diameter client implementation"""

    async def send_policy_request(self, subscriber_id: str, service_info: Dict):
        """Send policy request to Nokia PCRF"""
        avps = [
            AVP('Session-Id', str(uuid.uuid4())),
            AVP('Origin-Host', self.host),
            AVP('Origin-Realm', self.realm),
            AVP('Destination-Realm', 'nokia.example.com'),
            AVP('Auth-Application-Id', 16777238),  # Gx
            AVP('CC-Request-Type', 1),
            AVP('CC-Request-Number', 0),
            AVP('Subscription-Id', [
                AVP('Subscription-Id-Type', 0),
                AVP('Subscription-Id-Data', subscriber_id)
            ]),
            # Nokia vendor-specific AVPs
            AVP('Nokia-Service-Info', service_info, vendor_id=28458),
            AVP('Nokia-Policy-Control', 1, vendor_id=28458)
        ]

        request = self.client.create_request('CCR', avps)
        return await self.client.send_request(request)

Ericsson Integration

class EricssonDiameterClient(BasicDiameterClient):
    """Ericsson-specific Diameter client implementation"""

    async def send_charging_request(self, msisdn: str, service_id: str):
        """Send charging request to Ericsson OCS"""
        avps = [
            AVP('Session-Id', str(uuid.uuid4())),
            AVP('Origin-Host', self.host),
            AVP('Origin-Realm', self.realm),
            AVP('Destination-Realm', 'ericsson.example.com'),
            AVP('Auth-Application-Id', 4),
            AVP('CC-Request-Type', 1),
            AVP('CC-Request-Number', 0),
            AVP('Subscription-Id', [
                AVP('Subscription-Id-Type', 0),
                AVP('Subscription-Id-Data', msisdn)
            ]),
            # Ericsson vendor-specific AVPs
            AVP('Ericsson-Service-Id', service_id, vendor_id=193),
            AVP('Ericsson-Rating-Group', 1, vendor_id=193)
        ]

        request = self.client.create_request('CCR', avps)
        return await self.client.send_request(request)

Testing Examples

import pytest
from unittest.mock import Mock, patch

@pytest.fixture
async def diameter_client():
    """Create test client fixture"""
    client = BasicDiameterClient(
        host='test.example.com',
        port=3868,
        realm='test.com'
    )
    yield client
    await client.close()

async def test_authentication_request(diameter_client):
    """Test authentication request handling"""
    # Mock response
    mock_response = Mock()
    mock_response.result_code = 2001
    mock_response.error_message = None

    with patch.object(diameter_client.client, 'send_request',
                     return_value=mock_response):
        result = await authenticate_user(
            diameter_client,
            'test_user'
        )
        assert result is True

async def test_credit_control_request(diameter_client):
    """Test credit control request handling"""
    # Mock response with granted units
    mock_response = Mock()
    mock_response.result_code = 2001
    mock_response.get_avp = Mock(return_value=300)

    with patch.object(diameter_client.client, 'send_request',
                     return_value=mock_response):
        units = await request_credit(
            diameter_client,
            '123456789'
        )
        assert units == 300

async def test_error_handling(diameter_client):
    """Test error handling with retries"""
    # Mock response with error
    mock_response = Mock()
    mock_response.result_code = 3004  # TOO_BUSY

    with patch.object(diameter_client.client, 'send_request',
                     return_value=mock_response):
        with pytest.raises(DiameterError) as exc_info:
            await send_request_with_retry(
                diameter_client,
                Mock(),
                max_retries=2
            )
        assert exc_info.value.result_code == 3004