PerimeterX Mobile protection is a sophisticated bot detection system that analyzes device fingerprints, behavioral patterns, and cryptographic tokens to block automated requests.
This guide shows you how to generate valid px2/px3 tokens and successfully extract data from protected mobile apps like StockX, TextNow, and GrubHub with a 90%+ success rate.
What You'll Learn
- Detect which PerimeterX version (px2 vs px3) an app uses
- Decode and generate valid authorization tokens
- Handle TLS/JA3 fingerprinting challenges
- Build a resilient session management system
- Implement advanced evasion techniques
Step 1: Detect PerimeterX Implementation
Before attempting any bypass, you need to identify which PerimeterX version is protecting your target app. Each version has distinct characteristics that reveal its presence.
Enhanced Detection Script
import requests
import re
import json
from mitmproxy import http
class PerimeterXDetector:
def __init__(self):
self.px_patterns = {
'px3': r'3:[a-f0-9]+:[A-Za-z0-9+/]+={0,2}:\d+:[^:]+',
'px2': r'2:[A-Za-z0-9+/]+={0,2}',
'px1': r'1:\d+:\d+:[A-Za-z0-9+/]+={0,2}'
}
self.collector_patterns = [
r'collector-[a-zA-Z0-9]+\.perimeterx\.net',
r'collector-[a-zA-Z0-9]+\.px-cloud\.net',
r'https://collector-\w+\.px-cdn\.net/api/v\d/collector'
]
self.px_headers = [
'x-px-authorization',
'x-px-original-token',
'x-px-context',
'x-px-vid',
'x-px-uuid'
]
def detect_from_headers(self, headers):
"""Detect PerimeterX version from request headers"""
results = {
'detected': False,
'version': None,
'tokens': [],
'metadata': {}
}
# Check for PX tokens in headers
for header, value in headers.items():
if header.lower() in self.px_headers:
results['detected'] = True
# Identify version from token format
for version, pattern in self.px_patterns.items():
if re.match(pattern, value):
results['version'] = version
results['tokens'].append({
'header': header,
'value': value,
'version': version
})
# Extract additional metadata
if version == 'px3':
parts = value.split(':')
if len(parts) >= 5:
results['metadata'] = {
'hash': parts[1],
'payload': parts[2],
'app_id': parts[3],
'additional': parts[4]
}
return results
def detect_from_url(self, url):
"""Check if URL matches PerimeterX collector endpoints"""
for pattern in self.collector_patterns:
if re.search(pattern, url):
return True
return False
def detect_from_cookies(self, cookies):
"""Detect PerimeterX from cookie values"""
px_cookies = ['_px', '_px2', '_px3', '_pxhd', '_pxvid']
found_cookies = {}
for cookie_name in px_cookies:
if cookie_name in cookies:
found_cookies[cookie_name] = cookies[cookie_name]
return found_cookies
# Example usage with mitmproxy
def request(flow: http.HTTPFlow) -> None:
detector = PerimeterXDetector()
# Convert headers to dict
headers = dict(flow.request.headers)
# Detect PerimeterX
detection = detector.detect_from_headers(headers)
if detection['detected']:
print(f"[PX Detected] Version: {detection['version']}")
print(f"[PX Metadata] {json.dumps(detection['metadata'], indent=2)}")
# Check URL
if detector.detect_from_url(flow.request.url):
print(f"[PX Collector] {flow.request.url}")
Network Traffic Analysis
import frida
import sys
# Frida script to intercept PerimeterX calls in mobile apps
frida_script = """
Java.perform(function() {
// Hook OkHttp for Android apps
var OkHttpClient = Java.use("okhttp3.OkHttpClient");
var Request = Java.use("okhttp3.Request");
Request.header.implementation = function(name) {
var value = this.header(name);
if (name && name.toLowerCase().indexOf("px") !== -1) {
console.log("[PX Header] " + name + ": " + value);
}
return value;
};
// Hook URL connections
var URL = Java.use("java.net.URL");
URL.openConnection.implementation = function() {
var url = this.toString();
if (url.indexOf("perimeterx") !== -1 || url.indexOf("px-cloud") !== -1) {
console.log("[PX URL] " + url);
}
return this.openConnection();
};
});
"""
def on_message(message, data):
if message['type'] == 'send':
print(message['payload'])
# Attach to target app
device = frida.get_usb_device()
pid = device.spawn(["com.stockx.stockx"])
session = device.attach(pid)
script = session.create_script(frida_script)
script.on('message', on_message)
script.load()
device.resume(pid)
sys.stdin.read()
This detection script gives you a quick read on whether PerimeterX is active, and if so, whether it’s using px2 or px3.
You’ll often find telltale headers like x-px-authorization or URL patterns tied to collector domains. Knowing the version you’re dealing with will help determine your approach later on.
The token structure
Understanding how PerimeterX tokens are structured is key to bypassing the system.
Let’s break it down.
PX3 tokens are formatted like this:
def parse_px3_token(token):
parts = token.split(':')
if len(parts) < 5 or parts[0] != '3':
return None
return {
'version': parts[0],
'hash': parts[1],
'payload': parts[2], # Base64 encoded
'appId': parts[3],
'additional': parts[4]
}
# Example
token = "3:5b0de8f7a1d99c67add:MOXOASOAP5VNA==:1000:6xXzN9Q="
parsed = parse_px3_token(token)
print(parsed)
PX2 tokens, on the other hand, are simpler and usually look like this:
import base64
import json
def parse_px2_token(token):
parts = token.split(':')
if len(parts) < 2 or parts[0] != '2':
return None
try:
# Decode base64 payload
payload = base64.b64decode(parts[1]).decode('utf-8')
# Parse JSON
json_data = json.loads(payload)
return {
'version': parts[0],
'decoded_payload': json_data
}
except Exception as e:
print(f"Error decoding token: {e}")
return None
# Example
token = "2:eyJ1Ijoi7hTyuIdsHHQ2cGxhdGZvcm0iOiJpT1MiLCJ0diI6IjMuMy4wIiwidGFnIjoidjMuMy4wIiwidXVpZCI6ImFiYzEyMzQ1Njc4OSIsImNzcyI6e30sInNjIjpbXSwiY3VzdG9tIjp7fSwiY3RzIjoxNjE5NjM0OTA2Njk3LCJ2aWQiOiJhMTIzNDU2NyIsInNpZCI6ImJkZWYyIn0="
parsed = parse_px2_token(token)
print(parsed)
Grasping the components of these tokens—especially the base64 payloads—is critical. It’ll allow you to decode them, troubleshoot when something goes wrong, and ultimately build your own.
Step 2: Decode Token Structure
Understanding the internal structure of PerimeterX tokens is crucial for generating valid ones.
Advanced Token Parser
import base64
import json
import hashlib
import hmac
from datetime import datetime
class PerimeterXTokenParser:
def __init__(self):
self.token_cache = {}
def parse_px3_token(self, token):
"""Parse and decode PX3 token with full analysis"""
parts = token.split(':')
if len(parts) < 5 or parts[0] != '3':
return None
result = {
'version': parts[0],
'hash': parts[1],
'payload_encoded': parts[2],
'app_id': parts[3],
'signature': parts[4],
'decoded_payload': None,
'timestamp': None,
'device_info': None
}
# Decode base64 payload
try:
payload_decoded = base64.b64decode(parts[2] + '==') # Add padding if needed
# Try to parse as JSON
try:
payload_json = json.loads(payload_decoded)
result['decoded_payload'] = payload_json
# Extract common fields
if 't' in payload_json:
result['timestamp'] = datetime.fromtimestamp(payload_json['t'] / 1000)
if 'u' in payload_json:
result['device_info'] = payload_json['u']
except json.JSONDecodeError:
# Payload might be encrypted or binary
result['decoded_payload'] = payload_decoded.hex()
except Exception as e:
print(f"Error decoding payload: {e}")
return result
def parse_px2_token(self, token):
"""Parse and decode PX2 token"""
parts = token.split(':')
if len(parts) < 2 or parts[0] != '2':
return None
try:
# PX2 tokens are simpler - just version and base64 payload
payload = base64.b64decode(parts[1])
payload_json = json.loads(payload)
return {
'version': parts[0],
'payload': payload_json,
'timestamp': datetime.fromtimestamp(payload_json.get('cts', 0) / 1000),
'uuid': payload_json.get('uuid'),
'vid': payload_json.get('vid'),
'sid': payload_json.get('sid')
}
except Exception as e:
print(f"Error parsing PX2 token: {e}")
return None
def verify_token_signature(self, token, secret_key=None):
"""Verify token signature if secret key is known"""
parts = token.split(':')
if len(parts) < 5:
return False
# Reconstruct the message to sign
message = ':'.join(parts[:4])
if secret_key:
# Calculate expected signature
expected_sig = hmac.new(
secret_key.encode(),
message.encode(),
hashlib.sha256
).hexdigest()[:16] # PX typically uses truncated signatures
return expected_sig == parts[1] # Hash field
return None # Can't verify without key
# Example usage
parser = PerimeterXTokenParser()
# Parse a real PX3 token
px3_token = "3:abc123def456:eyJ0IjoxNjk5MDAwMDAwMDAwLCJ1IjoiZGV2aWNlLWlkLTEyMzQifQ==:1000:sig123"
parsed = parser.parse_px3_token(px3_token)
print(json.dumps(parsed, indent=2, default=str))
Step 3: Generate Valid Tokens
Now comes the meat of the process: generating valid tokens that fool PerimeterX into thinking your requests are legitimate.
Method 1: API-Based Token Generation (Most Reliable)
import requests
import time
import uuid
from typing import Dict, Optional
class TakionAPIGenerator:
"""Production-ready token generator using TakionAPI"""
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://px.takionapi.tech"
self.session = requests.Session()
self.token_cache = {}
self.cache_duration = 50 # Tokens valid for ~60 seconds
def generate_device_fingerprint(self, app_name: str) -> Dict:
"""Generate consistent device fingerprint for specific app"""
device_profiles = {
"StockX": {
"models": ["SM-G998B", "SM-S908U", "Pixel 7 Pro"],
"android_versions": ["13", "14"],
"app_version": "24.05.01"
},
"TextNow": {
"models": ["SM-A526U", "Pixel 6", "OnePlus 9"],
"android_versions": ["12", "13"],
"app_version": "24.5.1"
},
"GrubHub": {
"models": ["SM-G973U", "Pixel 5", "Xiaomi 12"],
"android_versions": ["12", "13", "14"],
"app_version": "2025.5.1"
}
}
profile = device_profiles.get(app_name, device_profiles["StockX"])
# Use UUID5 for consistent device IDs
device_id = str(uuid.uuid5(uuid.NAMESPACE_DNS, f"{app_name}:{self.api_key}"))
import random
random.seed(device_id) # Consistent randomization
model = random.choice(profile["models"])
android_version = random.choice(profile["android_versions"])
return {
"os": "android",
"osVersion": android_version,
"deviceId": device_id,
"model": model,
"manufacturer": model.split()[0] if ' ' in model else "Samsung",
"userAgent": f"{app_name}/{profile['app_version']} (Android {android_version}; {model})",
"packageName": self._get_package_name(app_name)
}
def _get_package_name(self, app_name: str) -> str:
"""Get Android package name for app"""
packages = {
"StockX": "com.stockx.stockx",
"TextNow": "com.enflick.android.TextNow",
"GrubHub": "com.grubhub.android"
}
return packages.get(app_name, "com.example.app")
def generate_token(self, app_id: str, app_name: str,
px_version: str = "px3",
use_cache: bool = True) -> Optional[Dict]:
"""Generate PX token with caching and retry logic"""
cache_key = f"{app_id}:{app_name}:{px_version}"
# Check cache
if use_cache and cache_key in self.token_cache:
cached = self.token_cache[cache_key]
if time.time() - cached['timestamp'] < self.cache_duration:
return cached['data']
device_info = self.generate_device_fingerprint(app_name)
payload = {
"appId": app_id,
"version": px_version,
"device": device_info,
"continent": "NA", # North America
"manufacturer": device_info["manufacturer"]
}
headers = {
"x-api-key": self.api_key,
"Content-Type": "application/json",
"User-Agent": "TakionAPI-Python/1.0"
}
# Retry logic
max_retries = 3
for attempt in range(max_retries):
try:
response = self.session.post(
f"{self.base_url}/generate",
json=payload,
headers=headers,
timeout=10
)
if response.status_code == 200:
result = response.json()
# Cache the result
if use_cache:
self.token_cache[cache_key] = {
'timestamp': time.time(),
'data': result
}
return result
elif response.status_code == 429:
# Rate limited
retry_after = int(response.headers.get('Retry-After', 5))
time.sleep(retry_after)
else:
print(f"Error: {response.status_code} - {response.text}")
except requests.exceptions.RequestException as e:
print(f"Request failed (attempt {attempt + 1}): {e}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
return None
Method 2: Custom Token Generator
If you’re working on a smaller project or just experimenting, you can also roll your own simple PX3 token generator.
Keep in mind, this approach might not work on heavily protected apps—but it’s a great way to learn how the system works under the hood.import time
import hashlib
import hmac
import struct
import random
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
class CustomPX3Generator:
"""Educational token generator showing internal mechanics"""
def __init__(self):
# These would normally come from reverse engineering
self.salt_prefix = b"px_mobile_"
self.aes_key = None # Would need to extract from app
def generate_sensor_data(self, device_id: str) -> Dict:
"""Generate realistic sensor data for mobile device"""
return {
"accelerometer": {
"x": random.uniform(-0.5, 0.5),
"y": random.uniform(-0.5, 0.5),
"z": random.uniform(9.5, 10.5) # Gravity
},
"gyroscope": {
"x": random.uniform(-0.01, 0.01),
"y": random.uniform(-0.01, 0.01),
"z": random.uniform(-0.01, 0.01)
},
"magnetometer": {
"x": random.uniform(-50, 50),
"y": random.uniform(-50, 50),
"z": random.uniform(-50, 50)
},
"orientation": random.randint(0, 3), # Portrait/landscape
"light": random.uniform(10, 1000), # Lux
"proximity": random.choice([0, 5]), # Near/far
"pressure": random.uniform(1000, 1020), # hPa
"temperature": random.uniform(20, 30) # Celsius
}
def generate_touch_events(self) -> list:
"""Generate realistic touch event patterns"""
events = []
timestamp = int(time.time() * 1000)
# Simulate user interaction pattern
for i in range(random.randint(3, 7)):
events.append({
"type": random.choice(["down", "move", "up"]),
"x": random.randint(100, 1000),
"y": random.randint(200, 2000),
"pressure": random.uniform(0.3, 0.8),
"size": random.uniform(0.05, 0.15),
"timestamp": timestamp + (i * random.randint(50, 200))
})
return events
def calculate_risk_score(self, device_info: Dict) -> float:
"""Calculate risk score based on device characteristics"""
score = 0.0
# Factors that affect risk score
if device_info.get("isRooted", False):
score += 0.3
if device_info.get("isEmulator", False):
score += 0.5
if device_info.get("debuggerAttached", False):
score += 0.4
if device_info.get("vpnDetected", False):
score += 0.2
# Normalize to 0-1 range
return min(score, 1.0)
def create_payload(self, app_id: str, device_id: str) -> Dict:
"""Create complete PX3 payload"""
timestamp = int(time.time() * 1000)
payload = {
"t": timestamp,
"u": device_id,
"v": "3.3.0",
"os": "android",
"osv": "13",
"app": app_id,
"sid": self._generate_session_id(),
"vid": self._generate_visitor_id(),
# Timing data
"ts": {
"tst": timestamp - random.randint(100, 500), # Token start time
"mst": random.randint(10, 50), # Measurement time
"rst": random.randint(1000, 5000) # Request start time
},
# Device data
"dd": {
"dm": "SM-G998B", # Device model
"db": "samsung", # Device brand
"dv": "13", # Device version
"dr": "1080x2400", # Resolution
"dp": 3.0, # Pixel density
"dl": "en-US", # Language
"dt": "America/New_York", # Timezone
"dw": False, # Is web view
"de": False # Is emulator
},
# Sensor data
"sd": self.generate_sensor_data(device_id),
# Touch events
"te": self.generate_touch_events(),
# Risk indicators
"ri": {
"rs": self.calculate_risk_score({"isRooted": False}),
"rb": False, # Root detected
"re": False, # Emulator detected
"rd": False, # Debugger detected
"rv": False # VPN detected
}
}
return payload
def _generate_session_id(self) -> str:
"""Generate unique session ID"""
return hashlib.md5(str(uuid.uuid4()).encode()).hexdigest()[:16]
def _generate_visitor_id(self) -> str:
"""Generate persistent visitor ID"""
return hashlib.sha256(str(uuid.uuid4()).encode()).hexdigest()[:32]
def encode_payload(self, payload: Dict) -> str:
"""Encode payload to base64"""
payload_json = json.dumps(payload, separators=(',', ':'))
return base64.b64encode(payload_json.encode()).decode().rstrip('=')
def calculate_hash(self, payload_b64: str, app_id: str, secret: str = None) -> str:
"""Calculate payload hash"""
if secret:
# HMAC-based hash
message = f"{payload_b64}:{app_id}".encode()
hash_obj = hmac.new(secret.encode(), message, hashlib.sha256)
return hash_obj.hexdigest()[:16]
else:
# Simple hash (less secure)
message = f"{self.salt_prefix.decode()}{payload_b64}:{app_id}".encode()
return hashlib.md5(message).hexdigest()[:16]
def generate_token(self, app_id: str, device_id: str, secret: str = None) -> str:
"""Generate complete PX3 token"""
# Create payload
payload = self.create_payload(app_id, device_id)
# Encode payload
payload_b64 = self.encode_payload(payload)
# Calculate hash
hash_value = self.calculate_hash(payload_b64, app_id, secret)
# Generate signature component
sig_data = str(random.random())
signature = base64.b64encode(sig_data.encode()).decode()[:8] + "="
# Construct final token
token = f"3:{hash_value}:{payload_b64}:{app_id}:{signature}"
return token
The takeaway here is this: while custom solutions might work for less secure implementations, most real-world scenarios will require using or mimicking a legitimate mobile environment.
Step 4: Implement Tokens with TLS Fingerprinting
Once you’ve got a valid token, you need to make sure it’s integrated properly in your requests.
That typically means adding it to your headers—often under x-px-authorization, though it can vary by app.def make_px_request(url, px_token, cookies=None):
TLS-Aware Request Handler
import tls_client
import httpx
from curl_cffi import requests as curl_requests
class PerimeterXRequestHandler:
"""Handle requests with proper TLS fingerprinting and PX tokens"""
def __init__(self, px_token_generator):
self.token_generator = px_token_generator
self.clients = self._initialize_clients()
def _initialize_clients(self):
"""Initialize different HTTP clients with proper TLS fingerprints"""
return {
'tls_client': self._setup_tls_client(),
'curl_cffi': self._setup_curl_cffi(),
'httpx': self._setup_httpx()
}
def _setup_tls_client(self):
"""Setup tls_client with Chrome fingerprint"""
session = tls_client.Session(
client_identifier="chrome_120",
random_tls_extension_order=True
)
# Set Chrome-like headers
session.headers = {
"User-Agent": "Mozilla/5.0 (Linux; Android 13; SM-G998B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Mobile Safari/537.36",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "en-US,en;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin"
}
return session
def _setup_curl_cffi(self):
"""Setup curl_cffi with browser impersonation"""
# curl_cffi can impersonate specific browser versions
return curl_requests.Session(impersonate="chrome120")
def _setup_httpx(self):
"""Setup httpx with HTTP/2 support"""
return httpx.Client(
http2=True,
headers={
"User-Agent": "StockX/24.05.01 (Android 13; SM-G998B)"
},
timeout=30.0
)
def make_request(self, url: str, method: str = "GET",
app_id: str = None, app_name: str = None,
client_type: str = "tls_client", **kwargs):
"""Make request with proper PX token and TLS fingerprint"""
# Generate fresh PX token
if app_id and app_name:
token_data = self.token_generator.generate_token(app_id, app_name)
if token_data and "token" in token_data:
# Add PX headers
headers = kwargs.get("headers", {})
headers.update({
"x-px-authorization": token_data["token"],
"x-px-original-token": token_data.get("original_token", ""),
"x-px-vid": token_data.get("vid", ""),
"x-px-uuid": token_data.get("uuid", "")
})
kwargs["headers"] = headers
# Select client
client = self.clients[client_type]
# Handle different client types
if client_type == "tls_client":
response = client.request(method, url, **kwargs)
elif client_type == "curl_cffi":
response = client.request(method, url, **kwargs)
elif client_type == "httpx":
response = client.request(method, url, **kwargs)
else:
raise ValueError(f"Unknown client type: {client_type}")
return response
def verify_tls_fingerprint(self, test_url: str = "https://tls.peet.ws/api/all"):
"""Verify your TLS fingerprint looks legitimate"""
response = self.make_request(test_url, client_type="tls_client")
data = response.json()
print(f"JA3 Hash: {data.get('ja3_hash')}")
print(f"JA3 Text: {data.get('ja3_text')}")
print(f"HTTP Version: {data.get('http_version')}")
print(f"TLS Version: {data.get('tls_version')}")
# Check if fingerprint matches known browsers
known_browser_ja3 = [
"cd08e31494f9531f560d64c695473da9", # Chrome 120
"b32309a26951912be7dba376398abc3b", # Firefox 115
"773906b0efdefa24a7f2b8eb6985bf37" # Safari 17
]
if data.get('ja3_hash') in known_browser_ja3:
print("✓ TLS fingerprint matches known browser")
else:
print("⚠ TLS fingerprint may be detected as bot")
return data
HTTP/2 Fingerprint Management
import h2.connection
import h2.config
import socket
import ssl
class HTTP2FingerprintManager:
"""Manage HTTP/2 fingerprints to avoid detection"""
def __init__(self):
self.settings = self._get_browser_settings()
def _get_browser_settings(self):
"""Get HTTP/2 settings that match real browsers"""
return {
"chrome": {
"HEADER_TABLE_SIZE": 65536,
"ENABLE_PUSH": 1,
"MAX_CONCURRENT_STREAMS": 1000,
"INITIAL_WINDOW_SIZE": 6291456,
"MAX_FRAME_SIZE": 16777215,
"MAX_HEADER_LIST_SIZE": 262144,
"WINDOW_UPDATE_INCREMENT": 15663105,
"PRIORITY_WEIGHT": 256,
"PSEUDO_HEADER_ORDER": [":method", ":authority", ":scheme", ":path"],
"HEADER_PRIORITY": {
"user-agent": 1,
"accept": 2,
"accept-language": 3,
"accept-encoding": 4
}
},
"firefox": {
"HEADER_TABLE_SIZE": 65536,
"ENABLE_PUSH": 0, # Firefox disables push
"MAX_CONCURRENT_STREAMS": 100,
"INITIAL_WINDOW_SIZE": 131072,
"MAX_FRAME_SIZE": 16384,
"MAX_HEADER_LIST_SIZE": None, # No limit
"WINDOW_UPDATE_INCREMENT": 12517377,
"PRIORITY_WEIGHT": 256,
"PSEUDO_HEADER_ORDER": [":method", ":path", ":authority", ":scheme"],
"HEADER_PRIORITY": {
"user-agent": 1,
"accept": 2,
"accept-language": 3,
"accept-encoding": 4,
"referer": 5
}
}
}
def create_h2_connection(self, browser: str = "chrome"):
"""Create HTTP/2 connection with browser-specific settings"""
config = h2.config.H2Configuration(
client_side=True,
validate_inbound_headers=False
)
conn = h2.connection.H2Connection(config=config)
settings = self.settings[browser]
# Apply browser-specific settings
conn.update_settings({
h2.settings.SettingCodes.HEADER_TABLE_SIZE: settings["HEADER_TABLE_SIZE"],
h2.settings.SettingCodes.ENABLE_PUSH: settings["ENABLE_PUSH"],
h2.settings.SettingCodes.MAX_CONCURRENT_STREAMS: settings["MAX_CONCURRENT_STREAMS"],
h2.settings.SettingCodes.INITIAL_WINDOW_SIZE: settings["INITIAL_WINDOW_SIZE"],
h2.settings.SettingCodes.MAX_FRAME_SIZE: settings["MAX_FRAME_SIZE"]
})
conn.initiate_connection()
return conn
def send_request_with_fingerprint(self, host: str, path: str,
px_token: str, browser: str = "chrome"):
"""Send HTTP/2 request with proper fingerprint"""
# Create SSL context
context = ssl.create_default_context()
context.set_alpn_protocols(['h2'])
# Connect
sock = socket.create_connection((host, 443))
ssock = context.wrap_socket(sock, server_hostname=host)
# Create H2 connection
conn = self.create_h2_connection(browser)
# Prepare headers with correct order
settings = self.settings[browser]
headers = [
(':method', 'GET'),
(':path', path),
(':scheme', 'https'),
(':authority', host),
('user-agent', self._get_user_agent(browser)),
('accept', 'application/json, text/plain, */*'),
('accept-language', 'en-US,en;q=0.9'),
('accept-encoding', 'gzip, deflate, br'),
('x-px-authorization', px_token)
]
# Send request
stream_id = conn.get_next_available_stream_id()
conn.send_headers(stream_id, headers)
ssock.sendall(conn.data_to_send())
# Read response
response_data = b''
while True:
data = ssock.recv(65536)
if not data:
break
events = conn.receive_data(data)
for event in events:
if isinstance(event, h2.events.DataReceived):
response_data += event.data
ssock.sendall(conn.data_to_send())
return response_data
def _get_user_agent(self, browser: str) -> str:
"""Get browser-specific user agent"""
user_agents = {
"chrome": "Mozilla/5.0 (Linux; Android 13) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Mobile Safari/537.36",
"firefox": "Mozilla/5.0 (Android 13; Mobile; rv:115.0) Gecko/115.0 Firefox/115.0"
}
return user_agents.get(browser, user_agents["chrome"])
Step 5: Build Resilient Session Management
Here’s something a lot of developers overlook: PX tokens aren’t meant to last.
Many expire within 60 seconds, so unless you’re updating them regularly, you’ll find yourself blocked again before long.
To keep things running smoothly, I built a session class that handles token refresh and retry logic automatically.class PerimeterXSession:
Advanced Session Manager
import threading
import queue
from datetime import datetime, timedelta
from typing import Optional, Dict, List
import pickle
class PerimeterXSessionManager:
"""Production-ready session manager with pooling and persistence"""
def __init__(self, token_generator, pool_size: int = 5):
self.token_generator = token_generator
self.pool_size = pool_size
self.sessions = queue.Queue(maxsize=pool_size)
self.session_stats = {}
self.lock = threading.Lock()
# Token refresh settings
self.token_ttl = 55 # Refresh before 60s expiry
self.refresh_threads = {}
# Initialize session pool
self._initialize_pool()
def _initialize_pool(self):
"""Initialize pool of sessions with different fingerprints"""
for i in range(self.pool_size):
session = self._create_session(f"session_{i}")
self.sessions.put(session)
def _create_session(self, session_id: str) -> Dict:
"""Create a new session with unique fingerprint"""
# Use different TLS clients for variety
client_types = ["tls_client", "curl_cffi", "httpx"]
client_type = client_types[hash(session_id) % len(client_types)]
# Generate unique device fingerprint
device_seed = hashlib.md5(session_id.encode()).hexdigest()
session = {
"id": session_id,
"client_type": client_type,
"device_seed": device_seed,
"px_token": None,
"token_generated": None,
"request_count": 0,
"success_count": 0,
"failure_count": 0,
"last_used": datetime.now(),
"client": self._create_client(client_type)
}
# Start token refresh thread
self._start_refresh_thread(session_id)
return session
def _create_client(self, client_type: str):
"""Create HTTP client based on type"""
if client_type == "tls_client":
return tls_client.Session(
client_identifier="chrome_120",
random_tls_extension_order=True
)
elif client_type == "curl_cffi":
return curl_requests.Session(impersonate="chrome120")
else:
return httpx.Client(http2=True)
def _start_refresh_thread(self, session_id: str):
"""Start background thread to refresh tokens"""
def refresh_worker():
while True:
try:
session = self._get_session_by_id(session_id)
if session:
if self._should_refresh_token(session):
self._refresh_token(session)
time.sleep(10) # Check every 10 seconds
except Exception as e:
print(f"Error in refresh thread: {e}")
thread = threading.Thread(target=refresh_worker, daemon=True)
thread.start()
self.refresh_threads[session_id] = thread
def _should_refresh_token(self, session: Dict) -> bool:
"""Check if token needs refresh"""
if not session.get("token_generated"):
return True
elapsed = (datetime.now() - session["token_generated"]).total_seconds()
return elapsed >= self.token_ttl
def _refresh_token(self, session: Dict):
"""Refresh PX token for session"""
with self.lock:
token_data = self.token_generator.generate_token(
app_id="PX1000",
app_name="StockX",
device_seed=session["device_seed"]
)
if token_data:
session["px_token"] = token_data["token"]
session["token_generated"] = datetime.now()
print(f"Refreshed token for session {session['id']}")
def _get_session_by_id(self, session_id: str) -> Optional[Dict]:
"""Get session by ID from pool"""
sessions = []
found = None
# Temporarily remove all sessions from queue
while not self.sessions.empty():
try:
session = self.sessions.get_nowait()
if session["id"] == session_id:
found = session
sessions.append(session)
except queue.Empty:
break
# Put sessions back
for session in sessions:
self.sessions.put(session)
return found
def get_session(self) -> Dict:
"""Get an available session from pool"""
session = self.sessions.get(timeout=30)
# Ensure token is fresh
if self._should_refresh_token(session):
self._refresh_token(session)
session["last_used"] = datetime.now()
return session
def release_session(self, session: Dict):
"""Return session to pool"""
session["request_count"] += 1
self.sessions.put(session)
def make_request(self, url: str, method: str = "GET", **kwargs):
"""Make request using pooled session"""
session = self.get_session()
try:
# Add PX headers
headers = kwargs.get("headers", {})
headers["x-px-authorization"] = session["px_token"]
kwargs["headers"] = headers
# Make request
client = session["client"]
response = client.request(method, url, **kwargs)
# Update stats
if response.status_code == 200:
session["success_count"] += 1
else:
session["failure_count"] += 1
return response
finally:
self.release_session(session)
def get_statistics(self) -> Dict:
"""Get session pool statistics"""
stats = {
"pool_size": self.pool_size,
"available_sessions": self.sessions.qsize(),
"total_requests": 0,
"total_successes": 0,
"total_failures": 0,
"sessions": []
}
# Collect stats from all sessions
sessions = []
while not self.sessions.empty():
try:
session = self.sessions.get_nowait()
stats["total_requests"] += session["request_count"]
stats["total_successes"] += session["success_count"]
stats["total_failures"] += session["failure_count"]
stats["sessions"].append({
"id": session["id"],
"requests": session["request_count"],
"success_rate": session["success_count"] / max(session["request_count"], 1),
"last_used": session["last_used"].isoformat()
})
sessions.append(session)
except queue.Empty:
break
# Put sessions back
for session in sessions:
self.sessions.put(session)
stats["overall_success_rate"] = stats["total_successes"] / max(stats["total_requests"], 1)
return stats
def save_state(self, filepath: str):
"""Save session state to file"""
state = {
"sessions": [],
"stats": self.get_statistics()
}
# Extract session data
while not self.sessions.empty():
try:
session = self.sessions.get_nowait()
# Remove non-serializable client object
session_data = {k: v for k, v in session.items() if k != "client"}
state["sessions"].append(session_data)
except queue.Empty:
break
with open(filepath, 'wb') as f:
pickle.dump(state, f)
print(f"Session state saved to {filepath}")
def load_state(self, filepath: str):
"""Load session state from file"""
with open(filepath, 'rb') as f:
state = pickle.load(f)
# Restore sessions
for session_data in state["sessions"]:
# Recreate client
session_data["client"] = self._create_client(session_data["client_type"])
self.sessions.put(session_data)
print(f"Restored {len(state['sessions'])} sessions from {filepath}")
This setup checks whether your token has expired, refreshes it when needed, and retries failed requests after getting a fresh one.
It’s a massive time-saver and helps prevent unnecessary blocks.
Step 6: Handle Common Challenges
Even with the right token, PerimeterX can still block you if other parts of your setup don’t look right.
Here’s how to deal with the most common pitfalls.
Challenge 1: Device Consistency
PerimeterX looks closely at your device's fingerprint.
To keep your session believable, you need consistent and realistic device.
class DeviceFingerprintManager:
"""Maintain consistent device fingerprints across requests"""
def __init__(self, seed: str = None):
self.seed = seed or str(uuid.uuid4())
self.fingerprints = {}
self.fingerprint_file = "device_fingerprints.json"
self.load_fingerprints()
def generate_fingerprint(self, app_name: str) -> Dict:
"""Generate or retrieve consistent fingerprint"""
if app_name in self.fingerprints:
return self.fingerprints[app_name]
# Generate new fingerprint
random.seed(f"{self.seed}:{app_name}")
# Android device properties
manufacturers = ["Samsung", "Google", "OnePlus", "Xiaomi", "OPPO"]
manufacturer = random.choice(manufacturers)
models = {
"Samsung": ["SM-G998B", "SM-S908U", "SM-A536B"],
"Google": ["Pixel 7 Pro", "Pixel 6a", "Pixel 8"],
"OnePlus": ["OnePlus 11", "OnePlus 10 Pro", "OnePlus 9"],
"Xiaomi": ["Xiaomi 13", "Redmi Note 12", "POCO F5"],
"OPPO": ["Find X6 Pro", "Reno 10", "A78"]
}
model = random.choice(models[manufacturer])
# Generate stable hardware IDs
android_id = hashlib.md5(f"{self.seed}:android_id".encode()).hexdigest()
serial = hashlib.md5(f"{self.seed}:serial".encode()).hexdigest()[:8].upper()
imei = self._generate_imei()
mac_address = self._generate_mac()
fingerprint = {
"manufacturer": manufacturer,
"model": model,
"android_version": random.choice(["12", "13", "14"]),
"android_id": android_id,
"serial": serial,
"imei": imei,
"mac_address": mac_address,
"build_id": f"UP1A.{random.randint(100000, 999999)}",
"screen_resolution": random.choice(["1080x2400", "1440x3200", "1080x2340"]),
"dpi": random.choice([420, 480, 560]),
"cpu_cores": random.choice([6, 8, 12]),
"ram_gb": random.choice([6, 8, 12, 16]),
"storage_gb": random.choice([128, 256, 512]),
"battery_capacity": random.randint(4000, 5000),
"sim_operator": random.choice(["310260", "310410", "311480"]), # T-Mobile, AT&T, Verizon
"timezone": "America/New_York",
"language": "en-US",
"bluetooth_name": f"{model}_{random.randint(1000, 9999)}"
}
self.fingerprints[app_name] = fingerprint
self.save_fingerprints()
return fingerprint
def _generate_imei(self) -> str:
"""Generate valid IMEI number"""
# TAC (Type Allocation Code) - 8 digits
tac = "35698104"
# Serial Number - 6 digits
serial = ''.join([str(random.randint(0, 9)) for _ in range(6)])
# Combine
imei_without_check = tac + serial
# Calculate Luhn check digit
check_digit = self._calculate_luhn(imei_without_check)
return imei_without_check + str(check_digit)
def _calculate_luhn(self, imei: str) -> int:
"""Calculate Luhn check digit"""
digits = [int(d) for d in imei]
odd_sum = sum(digits[-1::-2])
even_sum = sum([sum(divmod(2 * d, 10)) for d in digits[-2::-2]])
return (10 - (odd_sum + even_sum) % 10) % 10
def _generate_mac(self) -> str:
"""Generate valid MAC address"""
# Use common manufacturer prefixes
prefixes = ["00:1B:44", "00:1E:C2", "00:25:00", "28:C6:3F"]
prefix = random.choice(prefixes)
# Generate last 3 octets
suffix = ':'.join([f"{random.randint(0, 255):02X}" for _ in range(3)])
return f"{prefix}:{suffix}"
def save_fingerprints(self):
"""Save fingerprints to file"""
with open(self.fingerprint_file, 'w') as f:
json.dump(self.fingerprints, f, indent=2)
def load_fingerprints(self):
"""Load fingerprints from file"""
try:
with open(self.fingerprint_file, 'r') as f:
self.fingerprints = json.load(f)
except FileNotFoundError:
self.fingerprints = {}
Use a consistent seed when generating device info so your fingerprints remain stable across requests.
Don’t skip this step—it plays a big role in passing fingerprint checks.
Challenge 2: Proxy Rotation
class SmartProxyRotator:
"""Intelligent proxy rotation with health monitoring"""
def __init__(self, proxy_list: List[str]):
self.proxies = self._parse_proxies(proxy_list)
self.proxy_stats = {proxy: {"success": 0, "failure": 0, "last_used": None}
for proxy in self.proxies}
self.blacklist = set()
self.lock = threading.Lock()
def _parse_proxies(self, proxy_list: List[str]) -> List[Dict]:
"""Parse proxy strings into usable format"""
parsed = []
for proxy in proxy_list:
if proxy.startswith("http"):
parsed.append({"http": proxy, "https": proxy})
else:
# Assume format: user:pass@host:port
parsed.append({
"http": f"http://{proxy}",
"https": f"http://{proxy}"
})
return parsed
def get_proxy(self) -> Dict:
"""Get best performing proxy"""
with self.lock:
# Filter out blacklisted proxies
available = [p for p in self.proxies if str(p) not in self.blacklist]
if not available:
# Reset blacklist if all proxies are blacklisted
self.blacklist.clear()
available = self.proxies
# Sort by success rate
def success_rate(proxy):
stats = self.proxy_stats[str(proxy)]
total = stats["success"] + stats["failure"]
if total == 0:
return 0.5 # Neutral for unused proxies
return stats["success"] / total
available.sort(key=success_rate, reverse=True)
# Select proxy (weighted by performance)
if random.random() < 0.8: # 80% chance to use best proxy
proxy = available[0]
else: # 20% chance to explore others
proxy = random.choice(available)
self.proxy_stats[str(proxy)]["last_used"] = datetime.now()
return proxy
def report_success(self, proxy: Dict):
"""Report successful request"""
with self.lock:
self.proxy_stats[str(proxy)]["success"] += 1
def report_failure(self, proxy: Dict):
"""Report failed request"""
with self.lock:
stats = self.proxy_stats[str(proxy)]
stats["failure"] += 1
# Blacklist if failure rate too high
total = stats["success"] + stats["failure"]
if total >= 10 and stats["failure"] / total > 0.7:
self.blacklist.add(str(proxy))
print(f"Blacklisted proxy: {proxy}")
Challenge 3: App Version Monitoring
class AppVersionMonitor:
"""Monitor app versions and adapt to changes"""
def __init__(self):
self.versions = {}
self.update_interval = 3600 # Check hourly
self.last_check = {}
def get_latest_version(self, package_name: str) -> str:
"""Get latest app version from Play Store"""
# Check cache
if package_name in self.versions:
if package_name in self.last_check:
elapsed = (datetime.now() - self.last_check[package_name]).seconds
if elapsed < self.update_interval:
return self.versions[package_name]
try:
# Scrape Play Store
url = f"https://play.google.com/store/apps/details?id={package_name}"
response = requests.get(url, headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
})
# Parse version from HTML
version_pattern = r'<div[^>]*>Current Version</div><span[^>]*>([^<]+)</span>'
match = re.search(version_pattern, response.text)
if match:
version = match.group(1).strip()
self.versions[package_name] = version
self.last_check[package_name] = datetime.now()
return version
except Exception as e:
print(f"Error checking version: {e}")
return self.versions.get(package_name, "1.0.0")
Practical Examples
Complete StockX Scraper
class StockXScraper:
"""Production-ready StockX scraper with PerimeterX bypass"""
def __init__(self, api_key: str):
self.token_generator = TakionAPIGenerator(api_key)
self.session_manager = PerimeterXSessionManager(self.token_generator)
self.base_url = "https://api.stockx.com"
def search_products(self, query: str, limit: int = 50) -> List[Dict]:
"""Search for products on StockX"""
url = f"{self.base_url}/v2/products/search"
params = {
"query": query,
"limit": limit,
"page": 1
}
response = self.session_manager.make_request(url, params=params)
if response.status_code == 200:
return response.json().get("products", [])
else:
print(f"Search failed: {response.status_code}")
return []
def get_product_details(self, product_id: str) -> Dict:
"""Get detailed product information"""
url = f"{self.base_url}/products/{product_id}"
response = self.session_manager.make_request(url)
if response.status_code == 200:
return response.json()
else:
print(f"Failed to get product: {response.status_code}")
return {}
def get_market_data(self, product_id: str) -> Dict:
"""Get current market prices and activity"""
url = f"{self.base_url}/products/{product_id}/market"
response = self.session_manager.make_request(url)
if response.status_code == 200:
return response.json()
else:
return {}
# Usage
scraper = StockXScraper("YOUR_API_KEY")
# Search for Jordan sneakers
products = scraper.search_products("Jordan 1 Retro")
for product in products[:5]:
print(f"Product: {product['title']}")
print(f"Retail: ${product.get('retailPrice', 'N/A')}")
# Get market data
market = scraper.get_market_data(product['id'])
if market:
print(f"Lowest Ask: ${market.get('lowestAsk', 'N/A')}")
print(f"Highest Bid: ${market.get('highestBid', 'N/A')}")
print("-" * 50)
Performance Monitoring
class PerformanceMonitor:
"""Monitor and optimize bypass performance"""
def __init__(self):
self.metrics = {
"requests": 0,
"successes": 0,
"failures": 0,
"response_times": [],
"error_codes": {}
}
def track_request(self, response, elapsed_time: float):
"""Track request metrics"""
self.metrics["requests"] += 1
self.metrics["response_times"].append(elapsed_time)
if response.status_code == 200:
self.metrics["successes"] += 1
else:
self.metrics["failures"] += 1
code = str(response.status_code)
self.metrics["error_codes"][code] = self.metrics["error_codes"].get(code, 0) + 1
def get_stats(self) -> Dict:
"""Get performance statistics"""
if not self.metrics["requests"]:
return {"error": "No requests tracked"}
avg_response_time = sum(self.metrics["response_times"]) / len(self.metrics["response_times"])
return {
"total_requests": self.metrics["requests"],
"success_rate": self.metrics["successes"] / self.metrics["requests"],
"avg_response_time": avg_response_time,
"error_distribution": self.metrics["error_codes"]
}
Troubleshooting Guide
Common Issues and Solutions
Issue | Cause | Solution |
---|---|---|
403 Forbidden | Invalid/expired token | Refresh token before each request batch |
429 Too Many Requests | Rate limiting | Implement exponential backoff, rotate proxies |
Token validation fails | Wrong app_id or version | Verify app_id through network inspection |
Inconsistent success rate | Fingerprint mismatch | Ensure device info remains consistent |
Sudden drop in success | App update | Monitor app versions, update token generation |
Final Tips
- Monitor Everything: Track success rates, response times, and error patterns
- Rotate Intelligently: Don't just rotate proxies randomly - use performance-based selection
- Keep Fingerprints Consistent: Device characteristics should remain stable per session
- Update Regularly: PerimeterX evolves constantly - stay current with detection methods
- Respect Rate Limits: Even with bypasses, aggressive scraping will get you blocked
- Use Multiple Approaches: Combine token generation, TLS fingerprinting, and proxy rotation
Conclusion
Bypassing PerimeterX Mobile requires a multi-layered approach combining valid token generation, TLS fingerprinting, consistent device emulation, and intelligent session management. While the techniques evolve constantly, the fundamentals remain: appear as human as possible, maintain consistency, and adapt quickly to changes.
The methods in this guide should give you a solid foundation for working with PerimeterX-protected apps. Start with the API-based approach for reliability, then experiment with custom implementations as you learn more about your specific target's implementation.