Podcast: explore FCC Enforcement Applications of nDPI
Welcome to another episode of our podcast, where we explore the tools and code that power modern network analysis! Today, we’re taking a close look at ndpiSimpleIntegration.c
, a comprehensive example from the nDPI project—a popular open-source deep packet inspection toolkit. ndpiSimpleIntegration.c – Network Traffic Analysis in Action
What is nDPI?
nDPI is a library designed for network traffic classification, protocol detection, and traffic analysis. It’s widely used in network monitoring, security, and research. The project is maintained by ntop.org and is known for its extensibility and performance.
Spotlight: ndpiSimpleIntegration.c
The ndpiSimpleIntegration.c
file is more than just a sample—it’s a full-featured integration example that demonstrates how to use nDPI for real-world packet capture and analysis. Here’s what makes it interesting:
- Multi-threaded Packet Processing: The code supports multiple reader threads, each handling its own packet capture workflow. This design allows for efficient use of multi-core systems and high-throughput analysis.
- Protocol Detection: Leveraging nDPI’s detection engine, the example identifies application-layer protocols (like HTTP, TLS, and more) in real time.
- Flow Tracking: The code maintains active and idle flows, tracking their state, timeouts, and protocol classification.
- PCAP Integration: It can process both live network interfaces and offline PCAP files, making it versatile for both monitoring and forensic analysis.
- Extensive Logging: With verbose output enabled, you get detailed insights into each packet, flow, and protocol detected.
Why Should You Care?
If you’re interested in network security, traffic engineering, or just want to understand what’s happening on your network, nDPI and this example are invaluable. The code is well-structured, heavily commented, and demonstrates best practices for integrating DPI into your own tools.
Key Takeaways from the Code
- Initialization: The workflow sets up packet capture, applies BPF filters, and initializes the nDPI detection module.
- Flow Management: Flows are hashed and distributed across threads, with logic to handle timeouts and resource cleanup.
- Protocol Guessing: Even when full detection isn’t possible, the code attempts to guess the protocol, providing useful insights.
- Signal Handling: Graceful shutdown is supported, ensuring resources are freed and statistics are printed.
Final Thoughts
Whether you’re a developer, researcher, or network enthusiast, exploring ndpiSimpleIntegration.c
is a great way to learn about deep packet inspection and high-performance network analysis. Check out the nDPI project on GitHub, and don’t forget to listen to our full episode for a line-by-line walkthrough and expert commentary!
The nDPI packet and flow analysis described in the sources allows for the analysis of various unencrypted network packet information across different layers, as well as derived flow-specific details.
Here’s a breakdown of the unencrypted information that can be analyzed:
- Ethernet Layer (Layer 2) Information:
- Ethernet Type (
h_proto
): The protocol type encapsulated in the Ethernet frame, such asETH_P_IP
(0x0800) for IPv4,ETH_P_IPV6
(0x86DD) for IPv6, andETH_P_ARP
(0x0806) for ARP. Packets with unknown or non-IP/Ethernet datalink types are skipped.
- Ethernet Type (
- IP Layer (Layer 3) Information:
- Layer 3 Type (
l3_type
): Indicates whether the packet is IPv4 (L3_IP
) or IPv6 (L3_IP6
). - Source IP Address:
src
for IPv4 andsrc
for IPv6, allowing conversion to human-readable strings. - Destination IP Address:
dst
for IPv4 anddst
for IPv6, also convertible to human-readable strings. - IP Protocol (
l4_protocol
): The protocol number of the next layer (Layer 4), such as TCP, UDP, ICMP, ICMPv6, or IP options. - IP Size (
ip_size
): The size of the IP packet, excluding the Ethernet header.
- Layer 3 Type (
- Transport Layer (Layer 4) Information:
- Layer 4 Protocol (
l4_protocol
): Identifies the transport protocol, specifically differentiating between TCP (IPPROTO_TCP
) and UDP (IPPROTO_UDP
) for further parsing. It also identifies ICMP, ICMPv6, and IP options. - Source Port (
src_port
): The port number of the source application. - Destination Port (
dst_port
): The port number of the destination application. - TCP Flags (for TCP packets):
syn
: Indicates if the SYN flag is set.fin
: Indicates if the FIN flag is set.ack
: Indicates if the ACK flag is set.
is_midstream_flow
: A flag indicating if a TCP flow started mid-stream (SYN flag not seen).flow_fin_ack_seen
: A flag indicating if a TCP FIN+ACK packet has been observed for the flow.flow_ack_seen
: A flag indicating if a TCP ACK packet has been observed for the flow.total_l4_data_len
: The cumulative length of Layer 4 data processed for a flow.
- Layer 4 Protocol (
- Derived and Flow-Specific Information:
flow_id
: A unique identifier assigned to each detected flow.packets_processed
: The number of packets processed for a specific flow.- Timestamps:
first_seen
: The timestamp (in milliseconds) when the first packet of the flow was observed.last_seen
: The timestamp (in milliseconds) when the most recent packet of the flow was observed.
hashval
: A calculated hash value for the flow, used for efficient lookup in data structures.detected_l7_protocol
: The detected Layer 7 (application) protocol, including its master protocol, application protocol, and category. This is determined through deep packet inspection by thendpi_detection_process_packet
function.guessed_protocol
: A protocol guessed as a last resort if a definitive detection isn’t possible after a certain number of packets.detection_completed
: A flag indicating whether Layer 7 protocol detection has been finalized for the flow.- TLS-specific Details (extracted from unencrypted TLS handshake messages):
tls_client_hello_seen
: Flag indicating if a TLS Client Hello message has been processed.tls_server_hello_seen
: Flag indicating if a TLS Server Hello message (containing certificate info) has been processed.- SSL/TLS Version: The version of SSL/TLS used, e.g., TLSv1.2, TLSv1.3.
- Server Name Indication (SNI): The hostname the client is trying to connect to (
host_server_name
). - Advertised ALPNs: Application-Layer Protocol Negotiation values advertised by the client.
- Certificate Common Name(s): Hostnames listed in the server’s certificate (
server_names
). - Certificate Issuer DN: The distinguished name of the certificate issuer (
issuerDN
). - Certificate Subject DN: The distinguished name of the certificate subject (
subjectDN
).
- Flow Information String: A human-readable string containing various details about the flow, obtained via
ndpi_get_flow_info
.
Let’s delve deeper into how nDPI analyzes unencrypted network packet information, building upon our previous discussion. The process involves a multi-layered approach, from basic packet parsing to sophisticated flow tracking and application-layer protocol detection.
Here’s an enhanced understanding of the analysis capabilities based on the sources:
- Packet Capture and Initial Filtering
- The
nDPI_workflow
structure manages the packet capture process, usingpcap_open_live
for live capture orpcap_open_offline
for reading from a PCAP file. - A Berkeley Packet Filter (BPF) is applied, specifically configured to only capture “ip or ip6” packets, meaning it focuses solely on IPv4 and IPv6 traffic at Layer 3. Packets with other datalink types or those too short to contain essential headers (Ethernet, IP) are explicitly skipped.
- The
- Layer-by-Layer Parsing (
ndpi_process_packet
)- Each captured packet is passed to the
ndpi_process_packet
function for analysis. - Datalink Layer (Layer 2): The function first determines the datalink type (e.g., DLT_EN10MB for Ethernet) to correctly interpret the packet’s structure. It then identifies the Ethernet type (
h_proto
), such asETH_P_IP
(0x0800) for IPv4 orETH_P_IPV6
(0x86DD) for IPv6. ARP packets (ETH_P_ARP
) are explicitly skipped. - Network Layer (Layer 3): The packet is then parsed as an IPv4 (
ndpi_iphdr
) or IPv6 (ndpi_ipv6hdr
) packet. Key information extracted includes:- Source and Destination IP Addresses:
saddr
/daddr
for IPv4 andip6_src
/ip6_dst
for IPv6, which are stored in theip_tuple
union within thenDPI_flow_info
structure. These are later converted to human-readable strings for output. - IP Size: The total length of the IP packet, excluding the Ethernet header.
- Layer 4 Protocol: The
ndpi_detection_get_l4
function is called to identify the next layer’s protocol (e.g., TCP, UDP, ICMP, ICMPv6) and locate its starting pointer (l4_ptr
) and length (l4_len
).
- Source and Destination IP Addresses:
- Transport Layer (Layer 4): Based on the identified Layer 4 protocol:
- For TCP (
IPPROTO_TCP
), the source (source
) and destination (dest
) ports are extracted. Importantly, TCP flags like SYN, FIN, and ACK are examined. These flags inform theis_midstream_flow
(if SYN was not seen),flow_fin_ack_seen
(if FIN+ACK was observed), andflow_ack_seen
flags in thenDPI_flow_info
structure, which are critical for understanding the flow’s state and lifecycle. - For UDP (
IPPROTO_UDP
), the source (source
) and destination (dest
) ports are similarly extracted.
- For TCP (
- Each captured packet is passed to the
- Flow Identification and Management
- nDPI organizes packets into “flows,” represented by the
nDPI_flow_info
structure. - Flow Hashing: A
hashval
is calculated for each packet based on its Layer 4 protocol, source/destination IP addresses, and source/destination ports. This hash helps in efficiently locating or inserting flows into a b-tree data structure (ndpi_tfind
,ndpi_tsearch
). - Symmetry Handling: Since network flows can be bidirectional, if a flow isn’t found using the original source/destination, nDPI attempts to find it by swapping the source and destination IP addresses and ports. This ensures that both directions of a single conversation map to the same flow entry.
- Flow State Tracking:
- Each flow is assigned a unique
flow_id
. packets_processed
tracks the number of packets belonging to a flow.total_l4_data_len
accumulates the total Layer 4 data length for the flow.first_seen
andlast_seen
timestamps record the beginning and end of observed activity for a flow, crucial for timeout handling.
- Each flow is assigned a unique
- Idle Flow Management: A
check_for_idle_flows
mechanism periodically scans active flows. Flows are marked as idle and prepared for freeing if a TCP FIN+ACK sequence is observed or if they exceed aMAX_IDLE_TIME
(300 seconds or 5 minutes) since theirlast_seen
timestamp.
- nDPI organizes packets into “flows,” represented by the
- Application Layer Protocol Detection (Layer 7 DPI)
- The core of nDPI’s analysis is the
ndpi_detection_process_packet
function, which performs deep packet inspection to identify the application-layer protocol (detected_l7_protocol
). This protocol is categorized by itsmaster_protocol
,app_protocol
, andcategory
(e.g., NDPI_PROTOCOL_HTTP, NDPI_PROTOCOL_FACEBOOK, etc.). - Detection Completion: Once a protocol is definitively detected, the
detection_completed
flag is set, and flow information (including the detected protocol name and category) is printed. - Protocol Guessing: If a protocol cannot be conclusively identified after a certain number of packets (e.g.,
0xFF
packets),ndpi_detection_giveup
attempts to make aguessed_protocol
based on heuristics, providing a “last chance” classification.
- The core of nDPI’s analysis is the
- TLS Handshake Information Extraction (Unencrypted Parts)
- Even for encrypted traffic like TLS, the initial handshake messages contain unencrypted metadata that nDPI can analyze.
- Client Hello: When a TLS Client Hello message is processed, nDPI extracts and reports the SSL/TLS version (
ssl_version
), the Server Name Indication (SNI) which is the hostname the client is trying to connect to (host_server_name
), and Advertised ALPNs (Application-Layer Protocol Negotiation values). These are key pieces of information visible even without decrypting the payload. - Server Hello (Certificate Information): Upon processing the TLS Server Hello, if certificate information is available, nDPI can report the SSL/TLS version, Common Name(s) from the server’s certificate (
server_names
), the Issuer Distinguished Name (DN) (issuerDN
), and the Subject Distinguished Name (DN) (subjectDN
). This allows identification of the server and certificate authority.
nDPI provides a comprehensive framework for analyzing unencrypted network packet information by meticulously parsing each layer, managing flows based on extracted tuples, intelligently identifying application protocols, and extracting valuable metadata even from the unencrypted portions of otherwise encrypted protocols like TLS. This granular analysis is crucial for network monitoring, security, and traffic shaping applications.
nDPI’s capabilities in analyzing unencrypted network packet information can indirectly, yet significantly, assist the FCC in its Spectrum Enforcement efforts. While nDPI does not directly analyze radio frequencies or physical layer characteristics, it provides deep visibility into the types of traffic and applications utilizing the network infrastructure that operates over the radio spectrum. This allows for the identification of potential misuse, unauthorized services, or unusual patterns that might violate spectrum regulations or contribute to interference.
Here’s how nDPI can help:
- Identification of Application Protocols and Services:
- nDPI can accurately detect the Layer 7 (application) protocol (
detected_l7_protocol
) for each network flow, including its master protocol, application protocol, and category. This is critical because specific spectrum allocations are often granted for particular types of services (e.g., licensed radio for private voice, broadband data). - If, for example, a licensed frequency band is authorized only for specific data applications, nDPI could identify if unauthorized or unapproved applications (like large-scale streaming, peer-to-peer file sharing, or non-compliant VPNs) are operating over that spectrum. This can indicate a violation of the license terms or a deviation from the intended use of the spectrum.
- Even for protocols that are encrypted at the payload level, such as TLS, nDPI can extract unencrypted metadata from the handshake. This includes the Server Name Indication (SNI), which indicates the hostname the client intends to connect to, and certificate details like the Common Name(s), Issuer DN, and Subject DN. This allows the FCC to understand what services are being accessed even if the content is encrypted, providing insight into the nature of the traffic utilizing the spectrum.
- nDPI can accurately detect the Layer 7 (application) protocol (
- Analysis of Traffic Patterns and Volume:
- nDPI tracks vital flow statistics such as
packets_processed
andtotal_l4_data_len
for each unique flow. By monitoring these metrics, the FCC can ascertain the volume and intensity of data transmission associated with specific flows or applications. - Unusual spikes in data volume or sustained high-bandwidth flows for certain applications, especially on spectrum bands with limited capacity or specific usage restrictions, could indicate inefficient or unauthorized spectrum usage. For instance, an unexpected surge of video streaming traffic on a band allocated for low-bandwidth telemetry could signal a regulatory compliance issue.
- nDPI tracks vital flow statistics such as
- Endpoint Identification and Attribution:
- Each flow recorded by nDPI includes the source and destination IP addresses (
ip_tuple
) for both IPv4 and IPv6, as well as source and destination ports (src_port
,dst_port
). - This detailed addressing information allows the FCC to identify the communicating devices or entities involved in specific traffic flows. If a particular IP address or port is associated with a device or service that is known to violate spectrum regulations (e.g., an unauthorized broadcaster or a device causing interference), nDPI can pinpoint its network activity.
- The ability to convert IP tuples to human-readable strings further aids in this identification.
- Each flow recorded by nDPI includes the source and destination IP addresses (
- Detection of Unusual or Malformed Traffic:
- nDPI logs instances of “Malformed TCP packet” or “Malformed UDP packet”, and can identify unknown Layer 4 protocols. While these aren’t direct spectrum violations, such anomalies in network traffic could potentially be symptoms of misconfigured devices, network attacks, or other issues that might contribute to unintentional interference or inefficient spectrum use.
- The
is_midstream_flow
flag for TCP packets can highlight sessions where the SYN handshake was not observed, which might be indicative of unusual connection patterns.
- Evidence Collection and Reporting:
- The detailed information gathered by nDPI on each flow, including its unique
flow_id
, timestamps (first_seen
,last_seen
), and detected protocol, can be formatted into a human-readable flow information string usingndpi_get_flow_info
. - This comprehensive data serves as strong forensic evidence that the FCC can use in investigations, enforcement actions, and demonstrating non-compliance with spectrum licenses or regulations. It provides a granular view of what traffic was transmitted, when, and between whom, facilitating a robust enforcement process.
- The detailed information gathered by nDPI on each flow, including its unique
In essence, nDPI acts as a powerful analytical tool for understanding the “who, what, and when” of network traffic, providing the FCC with critical insights into how the radio spectrum is being utilized at the application level, thereby supporting effective spectrum enforcement.
Certainly, let’s continue to explore how nDPI’s capabilities can be a valuable asset to the FCC in their Spectrum Enforcement initiatives, building on our previous discussion and the provided source material.
nDPI, by providing deep visibility into network traffic, offers several ways to assist the FCC, even though it doesn’t directly interact with radio frequencies:
- Precise Identification of Application Protocols and Services (Layer 7 DPI) nDPI’s core strength lies in its ability to identify the application-layer protocol (
detected_l7_protocol
) for each network flow. This includes categorizing the traffic by itsmaster_protocol
,app_protocol
, andcategory
. This is profoundly helpful for the FCC because:- Spectrum licenses often dictate specific types of services or applications that are permitted to operate within a given frequency band. For instance, a band might be allocated for low-bandwidth telemetry or specific private network communications. If nDPI identifies high-bandwidth video streaming (e.g., YouTube, Netflix) or peer-to-peer file sharing (e.g., BitTorrent) applications on a band licensed for other purposes, it could indicate a violation of license terms or unauthorized use.
- The
ndpi_detection_process_packet
function performs this deep packet inspection, and once a protocol is definitively detected, thedetection_completed
flag is set. If a protocol cannot be conclusively identified after a certain number of packets (e.g.,0xFF
or0xFE
), nDPI attempts to make aguessed_protocol
classification, providing even “last chance” insights into the nature of the traffic.
- Extraction of Unencrypted TLS Handshake Metadata Even for encrypted traffic, nDPI can extract crucial unencrypted information from the initial TLS handshake, which is vital for the FCC’s understanding of the services operating over the spectrum.
- From a TLS Client Hello, nDPI can report the SSL/TLS version (
ssl_version
), the Server Name Indication (SNI) (host_server_name
), and Advertised ALPNs (Application-Layer Protocol Negotiation values). The SNI, in particular, reveals the hostname the client is trying to connect to, allowing the FCC to understand which online service is being accessed. This helps in identifying whether an entity is, for example, attempting to connect to unauthorized or banned services over a regulated spectrum. - From a TLS Server Hello, if certificate information is available, nDPI can report the SSL/TLS version, Common Name(s) (
server_names
) from the server’s certificate, the Issuer Distinguished Name (DN) (issuerDN
), and the Subject Distinguished Name (DN) (subjectDN
). This information helps the FCC identify the actual server and the certificate authority that issued its certificate, providing more context about the entities and services utilizing the spectrum.
- From a TLS Client Hello, nDPI can report the SSL/TLS version (
- Analysis of Traffic Patterns, Volume, and Flow State nDPI tracks detailed statistics for each flow (
nDPI_flow_info
structure), offering insights into how the spectrum is being utilized in terms of intensity and duration:- Traffic Volume and Intensity: Each flow records
packets_processed
andtotal_l4_data_len
. By aggregating this data, the FCC can detect unusual spikes in data volume or sustained high-bandwidth usage. For instance, if a spectrum band is designated for intermittent, low-data-rate communications, and nDPI consistently identifies large volumes of data for video or file transfers, it signals a potential deviation from the permitted use. - Flow State and Anomalies: The
nDPI_flow_info
structure includes flags likeis_midstream_flow
(indicating a TCP flow where the SYN handshake was not observed, which could be unusual), andflow_fin_ack_seen
(indicating the end of a TCP connection). The detection of “Malformed TCP packet” or “Malformed UDP packet” or unknown Layer 4 protocols by nDPI could point to misconfigured equipment, unapproved proprietary protocols, or even malicious activity that might contribute to inefficient spectrum use or interference.
- Traffic Volume and Intensity: Each flow records
- Endpoint Identification and Attribution Every flow processed by nDPI contains the source and destination IP addresses (
ip_tuple
for both IPv4 and IPv6) and source and destination ports (src_port
,dst_port
).- The
ip_tuple_to_string
function allows these IP addresses to be converted into human-readable strings. This granular addressing information enables the FCC to pinpoint the specific devices or entities involved in suspicious network activity. If an IP address is linked to an unauthorized user or a device known to cause interference, nDPI’s data can provide direct evidence of its network communications. The uniqueflow_id
assigned to each flow further aids in tracking specific conversations.
- The
- Comprehensive Evidence Collection and Reporting The detailed flow information gathered by nDPI, including the detected protocol, timestamps (
first_seen
,last_seen
), and unique flow ID, can be formatted into a human-readable flow information string usingndpi_get_flow_info
.- This comprehensive data serves as strong forensic evidence for the FCC. It provides a clear, granular record of “what” traffic was transmitted, “when” it occurred (
time_ms
,first_seen
,last_seen
), “between whom” (src_addr_str
,dst_addr_str
,src_port
,dst_port
), and the “application” responsible for the traffic. This level of detail is crucial for documenting non-compliance, issuing warnings, or taking enforcement actions against entities misusing the radio spectrum.
- This comprehensive data serves as strong forensic evidence for the FCC. It provides a clear, granular record of “what” traffic was transmitted, “when” it occurred (
Code in Review:
/*
*
* Copyright (C) 2011-25 - ntop.org
*
* nDPI is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* nDPI is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with nDPI. If not, see <http://www.gnu.org/licenses/>.
*
*/
#ifndef WIN32
#include <arpa/inet.h>
#include <netinet/in.h>
#endif
#include <errno.h>
#include <ndpi_api.h>
#include <ndpi_main.h>
#include <ndpi_typedefs.h>
#include <pcap/pcap.h>
#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#ifdef WIN32
#include <windows.h>
#endif
//#define VERBOSE 1
#define MAX_FLOW_ROOTS_PER_THREAD 2048
#define MAX_IDLE_FLOWS_PER_THREAD 64
#define TICK_RESOLUTION 1000
#define MAX_READER_THREADS 4
#define IDLE_SCAN_PERIOD 10000 /* msec */
#define MAX_IDLE_TIME 300000 /* msec */
#define INITIAL_THREAD_HASH 0x03dd018b
#ifndef ETH_P_IP
#define ETH_P_IP 0x0800
#endif
#ifndef ETH_P_IPV6
#define ETH_P_IPV6 0x86DD
#endif
#ifndef ETH_P_ARP
#define ETH_P_ARP 0x0806
#endif
enum nDPI_l3_type {
L3_IP, L3_IP6
};
struct nDPI_flow_info {
uint32_t flow_id;
unsigned long long int packets_processed;
uint64_t first_seen;
uint64_t last_seen;
uint64_t hashval;
enum nDPI_l3_type l3_type;
union {
struct {
uint32_t src;
uint32_t pad_00[3];
uint32_t dst;
uint32_t pad_01[3];
} v4;
struct {
uint64_t src[2];
uint64_t dst[2];
} v6;
struct {
uint32_t src[4];
uint32_t dst[4];
} u32;
} ip_tuple;
unsigned long long int total_l4_data_len;
uint16_t src_port;
uint16_t dst_port;
uint8_t is_midstream_flow:1;
uint8_t flow_fin_ack_seen:1;
uint8_t flow_ack_seen:1;
uint8_t detection_completed:1;
uint8_t tls_client_hello_seen:1;
uint8_t tls_server_hello_seen:1;
uint8_t flow_info_printed:1;
uint8_t reserved_00:1;
uint8_t l4_protocol;
struct ndpi_proto detected_l7_protocol;
struct ndpi_proto guessed_protocol;
struct ndpi_flow_struct * ndpi_flow;
};
struct nDPI_workflow {
pcap_t * pcap_handle;
volatile long int error_or_eof;
unsigned long long int packets_captured;
unsigned long long int packets_processed;
unsigned long long int total_l4_data_len;
unsigned long long int detected_flow_protocols;
uint64_t last_idle_scan_time;
uint64_t last_time;
void ** ndpi_flows_active;
unsigned long long int max_active_flows;
unsigned long long int cur_active_flows;
unsigned long long int total_active_flows;
void ** ndpi_flows_idle;
unsigned long long int max_idle_flows;
unsigned long long int cur_idle_flows;
unsigned long long int total_idle_flows;
struct ndpi_detection_module_struct * ndpi_struct;
};
struct nDPI_reader_thread {
struct nDPI_workflow * workflow;
pthread_t thread_id;
uint32_t array_index;
};
static struct nDPI_reader_thread reader_threads[MAX_READER_THREADS] = {};
static int reader_thread_count = MAX_READER_THREADS;
static volatile long int main_thread_shutdown = 0;
static volatile long int flow_id = 0;
static void free_workflow(struct nDPI_workflow ** const workflow);
static struct nDPI_workflow * init_workflow(char const * const file_or_device)
{
char pcap_error_buffer[PCAP_ERRBUF_SIZE];
struct nDPI_workflow * workflow = (struct nDPI_workflow *)ndpi_calloc(1, sizeof(*workflow));
const char *bpfFilter = "ip or ip6";
static struct bpf_program bpf_code;
static struct bpf_program *bpf_cfilter = NULL;
if (workflow == NULL) {
return NULL;
}
if (access(file_or_device, R_OK) != 0 && errno == ENOENT) {
workflow->pcap_handle = pcap_open_live(file_or_device, /* 1536 */ 65535, 1, 250, pcap_error_buffer);
} else {
#ifdef WIN32
workflow->pcap_handle = pcap_open_offline(file_or_device, pcap_error_buffer);
#else
workflow->pcap_handle = pcap_open_offline_with_tstamp_precision(file_or_device, PCAP_TSTAMP_PRECISION_MICRO,
pcap_error_buffer);
#endif
}
if (workflow->pcap_handle == NULL) {
fprintf(stderr, "pcap_open_live / pcap_open_offline: %.*s\n",
(int) PCAP_ERRBUF_SIZE, pcap_error_buffer);
free_workflow(&workflow);
return NULL;
}
if(pcap_compile(workflow->pcap_handle, &bpf_code, bpfFilter, 1, 0xFFFFFF00) < 0) {
printf("pcap_compile error: '%s'\n", pcap_geterr(workflow->pcap_handle));
exit(-1);
}
bpf_cfilter = &bpf_code;
if(pcap_setfilter(workflow->pcap_handle, bpf_cfilter) < 0) {
printf("pcap_setfilter error: '%s'\n", pcap_geterr(workflow->pcap_handle));
}
workflow->ndpi_struct = ndpi_init_detection_module(NULL);
if (workflow->ndpi_struct == NULL) {
free_workflow(&workflow);
return NULL;
}
workflow->total_active_flows = 0;
workflow->max_active_flows = MAX_FLOW_ROOTS_PER_THREAD;
workflow->ndpi_flows_active = (void **)ndpi_calloc(workflow->max_active_flows, sizeof(void *));
if (workflow->ndpi_flows_active == NULL) {
free_workflow(&workflow);
return NULL;
}
workflow->total_idle_flows = 0;
workflow->max_idle_flows = MAX_IDLE_FLOWS_PER_THREAD;
workflow->ndpi_flows_idle = (void **)ndpi_calloc(workflow->max_idle_flows, sizeof(void *));
if (workflow->ndpi_flows_idle == NULL) {
free_workflow(&workflow);
return NULL;
}
NDPI_PROTOCOL_BITMASK protos;
NDPI_BITMASK_SET_ALL(protos);
ndpi_set_protocol_detection_bitmask2(workflow->ndpi_struct, &protos);
ndpi_finalize_initialization(workflow->ndpi_struct);
return workflow;
}
static void ndpi_flow_info_freer(void * const node)
{
struct nDPI_flow_info * const flow = (struct nDPI_flow_info *)node;
ndpi_flow_free(flow->ndpi_flow);
ndpi_free(flow);
}
static void free_workflow(struct nDPI_workflow ** const workflow)
{
struct nDPI_workflow * const w = *workflow;
size_t i;
if (w == NULL) {
return;
}
if (w->pcap_handle != NULL) {
pcap_close(w->pcap_handle);
w->pcap_handle = NULL;
}
if (w->ndpi_struct != NULL) {
ndpi_exit_detection_module(w->ndpi_struct);
}
for(i = 0; i < w->max_active_flows; i++) {
ndpi_tdestroy(w->ndpi_flows_active[i], ndpi_flow_info_freer);
}
ndpi_free(w->ndpi_flows_active);
ndpi_free(w->ndpi_flows_idle);
ndpi_free(w);
*workflow = NULL;
}
static char * get_default_pcapdev(char *errbuf)
{
char * ifname;
pcap_if_t * all_devices = NULL;
if (pcap_findalldevs(&all_devices, errbuf) != 0)
{
return NULL;
}
ifname = strdup(all_devices[0].name);
pcap_freealldevs(all_devices);
return ifname;
}
static int setup_reader_threads(char const * const file_or_device)
{
char * file_or_default_device;
char pcap_error_buffer[PCAP_ERRBUF_SIZE];
int i;
if (reader_thread_count > MAX_READER_THREADS) {
return 1;
}
if (file_or_device == NULL) {
file_or_default_device = get_default_pcapdev(pcap_error_buffer);
if (file_or_default_device == NULL) {
fprintf(stderr, "pcap_findalldevs: %.*s\n", (int) PCAP_ERRBUF_SIZE, pcap_error_buffer);
return 1;
}
} else {
file_or_default_device = strdup(file_or_device);
if (file_or_default_device == NULL) {
return 1;
}
}
for (i = 0; i < reader_thread_count; ++i) {
reader_threads[i].workflow = init_workflow(file_or_default_device);
if (reader_threads[i].workflow == NULL)
{
free(file_or_default_device);
return 1;
}
}
free(file_or_default_device);
return 0;
}
static int ip_tuple_to_string(struct nDPI_flow_info const * const flow,
char * const src_addr_str, size_t src_addr_len,
char * const dst_addr_str, size_t dst_addr_len)
{
switch (flow->l3_type) {
case L3_IP:
return inet_ntop(AF_INET, (struct sockaddr_in *)&flow->ip_tuple.v4.src,
src_addr_str, src_addr_len) != NULL &&
inet_ntop(AF_INET, (struct sockaddr_in *)&flow->ip_tuple.v4.dst,
dst_addr_str, dst_addr_len) != NULL;
case L3_IP6:
return inet_ntop(AF_INET6, (struct sockaddr_in6 *)&flow->ip_tuple.v6.src[0],
src_addr_str, src_addr_len) != NULL &&
inet_ntop(AF_INET6, (struct sockaddr_in6 *)&flow->ip_tuple.v6.dst[0],
dst_addr_str, dst_addr_len) != NULL;
}
return 0;
}
#ifdef VERBOSE
static void print_packet_info(struct nDPI_reader_thread const * const reader_thread,
struct pcap_pkthdr const * const header,
uint32_t l4_data_len,
struct nDPI_flow_info const * const flow)
{
struct nDPI_workflow const * const workflow = reader_thread->workflow;
char src_addr_str[INET6_ADDRSTRLEN+1] = {0};
char dst_addr_str[INET6_ADDRSTRLEN+1] = {0};
char buf[256];
int used = 0, ret;
ret = ndpi_snprintf(buf, sizeof(buf), "[%8llu, %d, %4u] %4u bytes: ",
workflow->packets_captured, reader_thread->array_index,
flow->flow_id, header->caplen);
if (ret > 0) {
used += ret;
}
if (ip_tuple_to_string(flow, src_addr_str, sizeof(src_addr_str), dst_addr_str, sizeof(dst_addr_str)) != 0) {
ret = ndpi_snprintf(buf + used, sizeof(buf) - used, "IP[%s -> %s]", src_addr_str, dst_addr_str);
} else {
ret = ndpi_snprintf(buf + used, sizeof(buf) - used, "IP[ERROR]");
}
if (ret > 0) {
used += ret;
}
switch (flow->l4_protocol) {
case IPPROTO_UDP:
ret = ndpi_snprintf(buf + used, sizeof(buf) - used, " -> UDP[%u -> %u, %u bytes]",
flow->src_port, flow->dst_port, l4_data_len);
break;
case IPPROTO_TCP:
ret = ndpi_snprintf(buf + used, sizeof(buf) - used, " -> TCP[%u -> %u, %u bytes]",
flow->src_port, flow->dst_port, l4_data_len);
break;
case IPPROTO_ICMP:
ret = ndpi_snprintf(buf + used, sizeof(buf) - used, " -> ICMP");
break;
case IPPROTO_ICMPV6:
ret = ndpi_snprintf(buf + used, sizeof(buf) - used, " -> ICMP6");
break;
case IPPROTO_HOPOPTS:
ret = ndpi_snprintf(buf + used, sizeof(buf) - used, " -> ICMP6 Hop-By-Hop");
break;
default:
ret = ndpi_snprintf(buf + used, sizeof(buf) - used, " -> Unknown[0x%X]", flow->l4_protocol);
break;
}
if (ret > 0) {
used += ret;
}
printf("%.*s\n", used, buf);
}
#endif
static int ip_tuples_compare(struct nDPI_flow_info const * const A, struct nDPI_flow_info const * const B)
{
// generate a warning if the enum changes
switch (A->l3_type)
{
case L3_IP:
case L3_IP6:
break;
}
if (A->l3_type == L3_IP && B->l3_type == L3_IP)
{
if (A->ip_tuple.v4.src < B->ip_tuple.v4.src)
{
return -1;
}
if (A->ip_tuple.v4.src > B->ip_tuple.v4.src)
{
return 1;
}
if (A->ip_tuple.v4.dst < B->ip_tuple.v4.dst)
{
return -1;
}
if (A->ip_tuple.v4.dst > B->ip_tuple.v4.dst)
{
return 1;
}
}
else if (A->l3_type == L3_IP6 && B->l3_type == L3_IP6)
{
if (A->ip_tuple.v6.src[0] < B->ip_tuple.v6.src[0] && A->ip_tuple.v6.src[1] < B->ip_tuple.v6.src[1])
{
return -1;
}
if (A->ip_tuple.v6.src[0] > B->ip_tuple.v6.src[0] && A->ip_tuple.v6.src[1] > B->ip_tuple.v6.src[1])
{
return 1;
}
if (A->ip_tuple.v6.dst[0] < B->ip_tuple.v6.dst[0] && A->ip_tuple.v6.dst[1] < B->ip_tuple.v6.dst[1])
{
return -1;
}
if (A->ip_tuple.v6.dst[0] > B->ip_tuple.v6.dst[0] && A->ip_tuple.v6.dst[1] > B->ip_tuple.v6.dst[1])
{
return 1;
}
}
if (A->src_port < B->src_port)
{
return -1;
}
if (A->src_port > B->src_port)
{
return 1;
}
if (A->dst_port < B->dst_port)
{
return -1;
}
if (A->dst_port > B->dst_port)
{
return 1;
}
return 0;
}
static void ndpi_idle_scan_walker(void const * const A, ndpi_VISIT which, int depth, void * const user_data)
{
struct nDPI_workflow * const workflow = (struct nDPI_workflow *)user_data;
struct nDPI_flow_info * const flow = *(struct nDPI_flow_info **)A;
(void)depth;
if (workflow == NULL || flow == NULL) {
return;
}
if (workflow->cur_idle_flows == MAX_IDLE_FLOWS_PER_THREAD) {
return;
}
if (which == ndpi_preorder || which == ndpi_leaf) {
if ((flow->flow_fin_ack_seen == 1 && flow->flow_ack_seen == 1) ||
flow->last_seen + MAX_IDLE_TIME < workflow->last_time)
{
char src_addr_str[INET6_ADDRSTRLEN+1];
char dst_addr_str[INET6_ADDRSTRLEN+1];
ip_tuple_to_string(flow, src_addr_str, sizeof(src_addr_str), dst_addr_str, sizeof(dst_addr_str));
workflow->ndpi_flows_idle[workflow->cur_idle_flows++] = flow;
workflow->total_idle_flows++;
}
}
}
static int ndpi_workflow_node_cmp(void const * const A, void const * const B) {
struct nDPI_flow_info const * const flow_info_a = (struct nDPI_flow_info *)A;
struct nDPI_flow_info const * const flow_info_b = (struct nDPI_flow_info *)B;
if (flow_info_a->hashval < flow_info_b->hashval) {
return(-1);
} else if (flow_info_a->hashval > flow_info_b->hashval) {
return(1);
}
/* Flows have the same hash */
if (flow_info_a->l4_protocol < flow_info_b->l4_protocol) {
return(-1);
} else if (flow_info_a->l4_protocol > flow_info_b->l4_protocol) {
return(1);
}
return ip_tuples_compare(flow_info_a, flow_info_b);
}
static void check_for_idle_flows(struct nDPI_workflow * const workflow)
{
size_t idle_scan_index;
if (workflow->last_idle_scan_time + IDLE_SCAN_PERIOD < workflow->last_time) {
for (idle_scan_index = 0; idle_scan_index < workflow->max_active_flows; ++idle_scan_index) {
ndpi_twalk(workflow->ndpi_flows_active[idle_scan_index], ndpi_idle_scan_walker, workflow);
while (workflow->cur_idle_flows > 0) {
struct nDPI_flow_info * const f =
(struct nDPI_flow_info *)workflow->ndpi_flows_idle[--workflow->cur_idle_flows];
if (f->flow_fin_ack_seen == 1) {
printf("Free fin flow with id %u\n", f->flow_id);
} else {
printf("Free idle flow with id %u\n", f->flow_id);
}
ndpi_tdelete(f, &workflow->ndpi_flows_active[idle_scan_index],
ndpi_workflow_node_cmp);
ndpi_flow_info_freer(f);
workflow->cur_active_flows--;
}
}
workflow->last_idle_scan_time = workflow->last_time;
}
}
static void ndpi_process_packet(uint8_t * const args,
struct pcap_pkthdr const * const header,
uint8_t const * const packet)
{
struct nDPI_reader_thread * const reader_thread =
(struct nDPI_reader_thread *)args;
struct nDPI_workflow * workflow;
struct nDPI_flow_info flow;
size_t hashed_index;
void * tree_result;
struct nDPI_flow_info * flow_to_process;
const struct ndpi_ethhdr * ethernet;
const struct ndpi_iphdr * ip;
struct ndpi_ipv6hdr * ip6;
uint64_t time_ms;
const uint16_t eth_offset = 0;
uint16_t ip_offset;
uint16_t ip_size;
const uint8_t * l4_ptr = NULL;
uint16_t l4_len = 0;
uint16_t type;
uint32_t thread_index = INITIAL_THREAD_HASH; // generated with `dd if=/dev/random bs=1024 count=1 |& hd'
memset(&flow, '\0', sizeof(flow));
if (reader_thread == NULL) {
return;
}
workflow = reader_thread->workflow;
if (workflow == NULL) {
return;
}
workflow->packets_captured++;
time_ms = ((uint64_t) header->ts.tv_sec) * TICK_RESOLUTION + header->ts.tv_usec / (1000000 / TICK_RESOLUTION);
workflow->last_time = time_ms;
check_for_idle_flows(workflow);
/* process datalink layer */
switch (pcap_datalink(workflow->pcap_handle)) {
case DLT_NULL:
if (ntohl(*((uint32_t *)&packet[eth_offset])) == 0x00000002) {
type = ETH_P_IP;
} else {
type = ETH_P_IPV6;
}
ip_offset = 4 + eth_offset;
break;
case DLT_EN10MB:
if (header->len < sizeof(struct ndpi_ethhdr)) {
fprintf(stderr, "[%8llu, %d] Ethernet packet too short - skipping\n",
workflow->packets_captured, reader_thread->array_index);
return;
}
ethernet = (struct ndpi_ethhdr *) &packet[eth_offset];
ip_offset = sizeof(struct ndpi_ethhdr) + eth_offset;
type = ntohs(ethernet->h_proto);
switch (type) {
case ETH_P_IP: /* IPv4 */
if (header->len < sizeof(struct ndpi_ethhdr) + sizeof(struct ndpi_iphdr)) {
fprintf(stderr, "[%8llu, %d] IP packet too short - skipping\n",
workflow->packets_captured, reader_thread->array_index);
return;
}
break;
case ETH_P_IPV6: /* IPV6 */
if (header->len < sizeof(struct ndpi_ethhdr) + sizeof(struct ndpi_ipv6hdr)) {
fprintf(stderr, "[%8llu, %d] IP6 packet too short - skipping\n",
workflow->packets_captured, reader_thread->array_index);
return;
}
break;
case ETH_P_ARP: /* ARP */
return;
default:
fprintf(stderr, "[%8llu, %d] Unknown Ethernet packet with type 0x%X - skipping\n",
workflow->packets_captured, reader_thread->array_index, type);
return;
}
break;
default:
fprintf(stderr, "[%8llu, %d] Captured non IP/Ethernet packet with datalink type 0x%X - skipping\n",
workflow->packets_captured, reader_thread->array_index, pcap_datalink(workflow->pcap_handle));
return;
}
if (type == ETH_P_IP) {
ip = (struct ndpi_iphdr *)&packet[ip_offset];
ip6 = NULL;
} else if (type == ETH_P_IPV6) {
ip = NULL;
ip6 = (struct ndpi_ipv6hdr *)&packet[ip_offset];
} else {
fprintf(stderr, "[%8llu, %d] Captured non IPv4/IPv6 packet with type 0x%X - skipping\n",
workflow->packets_captured, reader_thread->array_index, type);
return;
}
ip_size = header->len - ip_offset;
if (type == ETH_P_IP && header->len >= ip_offset) {
if (header->caplen < header->len) {
fprintf(stderr, "[%8llu, %d] Captured packet size is smaller than packet size: %u < %u\n",
workflow->packets_captured, reader_thread->array_index, header->caplen, header->len);
}
}
/* process layer3 e.g. IPv4 / IPv6 */
if (ip != NULL && ip->version == 4) {
if (ip_size < sizeof(*ip)) {
fprintf(stderr, "[%8llu, %d] Packet smaller than IP4 header length: %u < %zu\n",
workflow->packets_captured, reader_thread->array_index, ip_size, sizeof(*ip));
return;
}
flow.l3_type = L3_IP;
if (ndpi_detection_get_l4((uint8_t*)ip, ip_size, &l4_ptr, &l4_len,
&flow.l4_protocol, NDPI_DETECTION_ONLY_IPV4) != 0)
{
fprintf(stderr, "[%8llu, %d] nDPI IPv4/L4 payload detection failed, L4 length: %zu\n",
workflow->packets_captured, reader_thread->array_index, ip_size - sizeof(*ip));
return;
}
flow.ip_tuple.v4.src = ip->saddr;
flow.ip_tuple.v4.dst = ip->daddr;
uint32_t min_addr = (flow.ip_tuple.v4.src > flow.ip_tuple.v4.dst ?
flow.ip_tuple.v4.dst : flow.ip_tuple.v4.src);
thread_index = min_addr + ip->protocol;
} else if (ip6 != NULL) {
if (ip_size < sizeof(ip6->ip6_hdr)) {
fprintf(stderr, "[%8llu, %d] Packet smaller than IP6 header length: %u < %zu\n",
workflow->packets_captured, reader_thread->array_index, ip_size, sizeof(ip6->ip6_hdr));
return;
}
flow.l3_type = L3_IP6;
if (ndpi_detection_get_l4((uint8_t*)ip6, ip_size, &l4_ptr, &l4_len,
&flow.l4_protocol, NDPI_DETECTION_ONLY_IPV6) != 0)
{
fprintf(stderr, "[%8llu, %d] nDPI IPv6/L4 payload detection failed, L4 length: %zu\n",
workflow->packets_captured, reader_thread->array_index, ip_size - sizeof(*ip6));
return;
}
flow.ip_tuple.v6.src[0] = ip6->ip6_src.u6_addr.u6_addr64[0];
flow.ip_tuple.v6.src[1] = ip6->ip6_src.u6_addr.u6_addr64[1];
flow.ip_tuple.v6.dst[0] = ip6->ip6_dst.u6_addr.u6_addr64[0];
flow.ip_tuple.v6.dst[1] = ip6->ip6_dst.u6_addr.u6_addr64[1];
uint64_t min_addr[2];
if (flow.ip_tuple.v6.src[0] > flow.ip_tuple.v6.dst[0] &&
flow.ip_tuple.v6.src[1] > flow.ip_tuple.v6.dst[1])
{
min_addr[0] = flow.ip_tuple.v6.dst[0];
min_addr[1] = flow.ip_tuple.v6.dst[0];
} else {
min_addr[0] = flow.ip_tuple.v6.src[0];
min_addr[1] = flow.ip_tuple.v6.src[0];
}
thread_index = min_addr[0] + min_addr[1] + ip6->ip6_hdr.ip6_un1_nxt;
} else {
fprintf(stderr, "[%8llu, %d] Non IP/IPv6 protocol detected: 0x%X\n",
workflow->packets_captured, reader_thread->array_index, type);
return;
}
/* process layer4 e.g. TCP / UDP */
if (flow.l4_protocol == IPPROTO_TCP) {
const struct ndpi_tcphdr * tcp;
if (header->len < (l4_ptr - packet) + sizeof(struct ndpi_tcphdr)) {
fprintf(stderr, "[%8llu, %d] Malformed TCP packet, packet size smaller than expected: %u < %zu\n",
workflow->packets_captured, reader_thread->array_index,
header->len, (l4_ptr - packet) + sizeof(struct ndpi_tcphdr));
return;
}
tcp = (struct ndpi_tcphdr *)l4_ptr;
flow.is_midstream_flow = (tcp->syn == 0 ? 1 : 0);
flow.flow_fin_ack_seen = (tcp->fin == 1 && tcp->ack == 1 ? 1 : 0);
flow.flow_ack_seen = tcp->ack;
flow.src_port = ntohs(tcp->source);
flow.dst_port = ntohs(tcp->dest);
} else if (flow.l4_protocol == IPPROTO_UDP) {
const struct ndpi_udphdr * udp;
if (header->len < (l4_ptr - packet) + sizeof(struct ndpi_udphdr)) {
fprintf(stderr, "[%8llu, %d] Malformed UDP packet, packet size smaller than expected: %u < %zu\n",
workflow->packets_captured, reader_thread->array_index,
header->len, (l4_ptr - packet) + sizeof(struct ndpi_udphdr));
return;
}
udp = (struct ndpi_udphdr *)l4_ptr;
flow.src_port = ntohs(udp->source);
flow.dst_port = ntohs(udp->dest);
}
/* distribute flows to threads while keeping stability (same flow goes always to same thread) */
thread_index += (flow.src_port < flow.dst_port ? flow.dst_port : flow.src_port);
thread_index %= reader_thread_count;
if (thread_index != reader_thread->array_index) {
return;
}
workflow->packets_processed++;
workflow->total_l4_data_len += l4_len;
#ifdef VERBOSE
print_packet_info(reader_thread, header, l4_len, &flow);
#endif
{
uint64_t tmp[4] = {};
/* calculate flow hash for btree find, search(insert) */
if (flow.l3_type == L3_IP) {
if (ndpi_flowv4_flow_hash(flow.l4_protocol, flow.ip_tuple.v4.src, flow.ip_tuple.v4.dst,
flow.src_port, flow.dst_port, 0, 0,
(uint8_t *)&tmp[0], sizeof(tmp)) != 0)
{
flow.hashval = flow.ip_tuple.v4.src + flow.ip_tuple.v4.dst; // fallback
} else {
flow.hashval = tmp[0] + tmp[1] + tmp[2] + tmp[3];
}
} else if (flow.l3_type == L3_IP6) {
if (ndpi_flowv6_flow_hash(flow.l4_protocol, &ip6->ip6_src, &ip6->ip6_dst,
flow.src_port, flow.dst_port, 0, 0,
(uint8_t *)&tmp[0], sizeof(tmp)) != 0)
{
flow.hashval = flow.ip_tuple.v6.src[0] + flow.ip_tuple.v6.src[1];
flow.hashval += flow.ip_tuple.v6.dst[0] + flow.ip_tuple.v6.dst[1];
} else {
flow.hashval = tmp[0] + tmp[1] + tmp[2] + tmp[3];
}
}
flow.hashval += flow.l4_protocol + flow.src_port + flow.dst_port;
}
hashed_index = flow.hashval % workflow->max_active_flows;
tree_result = ndpi_tfind(&flow, &workflow->ndpi_flows_active[hashed_index], ndpi_workflow_node_cmp);
if (tree_result == NULL) {
/* flow not found in btree: switch src <-> dst and try to find it again */
uint32_t orig_src_ip[4] = { flow.ip_tuple.u32.src[0], flow.ip_tuple.u32.src[1],
flow.ip_tuple.u32.src[2], flow.ip_tuple.u32.src[3] };
uint32_t orig_dst_ip[4] = { flow.ip_tuple.u32.dst[0], flow.ip_tuple.u32.dst[1],
flow.ip_tuple.u32.dst[2], flow.ip_tuple.u32.dst[3] };
uint16_t orig_src_port = flow.src_port;
uint16_t orig_dst_port = flow.dst_port;
flow.ip_tuple.u32.src[0] = orig_dst_ip[0];
flow.ip_tuple.u32.src[1] = orig_dst_ip[1];
flow.ip_tuple.u32.src[2] = orig_dst_ip[2];
flow.ip_tuple.u32.src[3] = orig_dst_ip[3];
flow.ip_tuple.u32.dst[0] = orig_src_ip[0];
flow.ip_tuple.u32.dst[1] = orig_src_ip[1];
flow.ip_tuple.u32.dst[2] = orig_src_ip[2];
flow.ip_tuple.u32.dst[3] = orig_src_ip[3];
flow.src_port = orig_dst_port;
flow.dst_port = orig_src_port;
tree_result = ndpi_tfind(&flow, &workflow->ndpi_flows_active[hashed_index], ndpi_workflow_node_cmp);
flow.ip_tuple.u32.src[0] = orig_src_ip[0];
flow.ip_tuple.u32.src[1] = orig_src_ip[1];
flow.ip_tuple.u32.src[2] = orig_src_ip[2];
flow.ip_tuple.u32.src[3] = orig_src_ip[3];
flow.ip_tuple.u32.dst[0] = orig_dst_ip[0];
flow.ip_tuple.u32.dst[1] = orig_dst_ip[1];
flow.ip_tuple.u32.dst[2] = orig_dst_ip[2];
flow.ip_tuple.u32.dst[3] = orig_dst_ip[3];
flow.src_port = orig_src_port;
flow.dst_port = orig_dst_port;
}
if (tree_result == NULL) {
/* flow still not found, must be new */
if (workflow->cur_active_flows == workflow->max_active_flows) {
fprintf(stderr, "[%8llu, %d] max flows to track reached: %llu, idle: %llu\n",
workflow->packets_captured, reader_thread->array_index,
workflow->max_active_flows, workflow->cur_idle_flows);
return;
}
flow_to_process = (struct nDPI_flow_info *)ndpi_malloc(sizeof(*flow_to_process));
if (flow_to_process == NULL) {
fprintf(stderr, "[%8llu, %d] Not enough memory for flow info\n",
workflow->packets_captured, reader_thread->array_index);
return;
}
memcpy(flow_to_process, &flow, sizeof(*flow_to_process));
flow_to_process->flow_id = __sync_fetch_and_add(&flow_id, 1);
flow_to_process->ndpi_flow = (struct ndpi_flow_struct *)ndpi_flow_malloc(SIZEOF_FLOW_STRUCT);
if (flow_to_process->ndpi_flow == NULL) {
fprintf(stderr, "[%8llu, %d, %4u] Not enough memory for flow struct\n",
workflow->packets_captured, reader_thread->array_index, flow_to_process->flow_id);
return;
}
memset(flow_to_process->ndpi_flow, 0, SIZEOF_FLOW_STRUCT);
printf("[%8llu, %d, %4u] new %sflow\n", workflow->packets_captured, thread_index,
flow_to_process->flow_id,
(flow_to_process->is_midstream_flow != 0 ? "midstream-" : ""));
if (ndpi_tsearch(flow_to_process, &workflow->ndpi_flows_active[hashed_index], ndpi_workflow_node_cmp) == NULL) {
/* Possible Leak, but should not happen as we'd abort earlier. */
return;
}
workflow->cur_active_flows++;
workflow->total_active_flows++;
} else {
flow_to_process = *(struct nDPI_flow_info **)tree_result;
}
flow_to_process->packets_processed++;
flow_to_process->total_l4_data_len += l4_len;
/* update timestamps, important for timeout handling */
if (flow_to_process->first_seen == 0) {
flow_to_process->first_seen = time_ms;
}
flow_to_process->last_seen = time_ms;
/* current packet is an TCP-ACK? */
flow_to_process->flow_ack_seen = flow.flow_ack_seen;
/* TCP-FIN: indicates that at least one side wants to end the connection */
if (flow.flow_fin_ack_seen != 0 && flow_to_process->flow_fin_ack_seen == 0) {
flow_to_process->flow_fin_ack_seen = 1;
printf("[%8llu, %d, %4u] end of flow\n", workflow->packets_captured, thread_index,
flow_to_process->flow_id);
return;
}
/*
* This example tries to use maximum supported packets for detection:
* for uint8: 0xFF
*/
if (flow_to_process->ndpi_flow->num_processed_pkts == 0xFF) {
return;
} else if (flow_to_process->ndpi_flow->num_processed_pkts == 0xFE) {
/* last chance to guess something, better then nothing */
uint8_t protocol_was_guessed = 0;
flow_to_process->guessed_protocol =
ndpi_detection_giveup(workflow->ndpi_struct,
flow_to_process->ndpi_flow,
&protocol_was_guessed);
if (protocol_was_guessed != 0) {
printf("[%8llu, %d, %4d][GUESSED] protocol: %s | app protocol: %s | category: %s\n",
workflow->packets_captured,
reader_thread->array_index,
flow_to_process->flow_id,
ndpi_get_proto_name(workflow->ndpi_struct, flow_to_process->guessed_protocol.proto.master_protocol),
ndpi_get_proto_name(workflow->ndpi_struct, flow_to_process->guessed_protocol.proto.app_protocol),
ndpi_category_get_name(workflow->ndpi_struct, flow_to_process->guessed_protocol.category));
} else {
printf("[%8llu, %d, %4d][FLOW NOT CLASSIFIED]\n",
workflow->packets_captured, reader_thread->array_index, flow_to_process->flow_id);
}
}
flow_to_process->detected_l7_protocol =
ndpi_detection_process_packet(workflow->ndpi_struct, flow_to_process->ndpi_flow,
ip != NULL ? (uint8_t *)ip : (uint8_t *)ip6,
ip_size, time_ms, NULL);
if (ndpi_is_protocol_detected(flow_to_process->detected_l7_protocol) != 0 &&
flow_to_process->detection_completed == 0)
{
if (flow_to_process->detected_l7_protocol.proto.master_protocol != NDPI_PROTOCOL_UNKNOWN ||
flow_to_process->detected_l7_protocol.proto.app_protocol != NDPI_PROTOCOL_UNKNOWN)
{
flow_to_process->detection_completed = 1;
workflow->detected_flow_protocols++;
printf("[%8llu, %d, %4d][DETECTED] protocol: %s | app protocol: %s | category: %s\n",
workflow->packets_captured,
reader_thread->array_index,
flow_to_process->flow_id,
ndpi_get_proto_name(workflow->ndpi_struct, flow_to_process->detected_l7_protocol.proto.master_protocol),
ndpi_get_proto_name(workflow->ndpi_struct, flow_to_process->detected_l7_protocol.proto.app_protocol),
ndpi_category_get_name(workflow->ndpi_struct, flow_to_process->detected_l7_protocol.category));
}
}
if (flow_to_process->ndpi_flow->num_extra_packets_checked <=
flow_to_process->ndpi_flow->max_extra_packets_to_check)
{
/*
* Your business logic starts here.
*
* This example does print some information about
* TLS client and server hellos if available.
*
* You could also use nDPI's built-in json serialization
* and send it to a high-level application for further processing.
*
* EoE - End of Example
*/
if (flow_to_process->flow_info_printed == 0)
{
char const * const flow_info = ndpi_get_flow_info(flow_to_process->ndpi_flow, &flow_to_process->detected_l7_protocol);
if (flow_info != NULL)
{
printf("[%8llu, %d, %4d] info: %s\n",
workflow->packets_captured,
reader_thread->array_index,
flow_to_process->flow_id,
flow_info);
flow_to_process->flow_info_printed = 1;
}
}
if (flow_to_process->detected_l7_protocol.proto.master_protocol == NDPI_PROTOCOL_TLS ||
flow_to_process->detected_l7_protocol.proto.app_protocol == NDPI_PROTOCOL_TLS)
{
if (flow_to_process->tls_client_hello_seen == 0 &&
flow_to_process->ndpi_flow->protos.tls_quic.client_hello_processed != 0)
{
uint8_t unknown_tls_version = 0;
char buf_ver[16];
printf("[%8llu, %d, %4d][TLS-CLIENT-HELLO] version: %s | sni: %s | (advertised) ALPNs: %s\n",
workflow->packets_captured,
reader_thread->array_index,
flow_to_process->flow_id,
ndpi_ssl_version2str(buf_ver, sizeof(buf_ver),
flow_to_process->ndpi_flow->protos.tls_quic.ssl_version,
&unknown_tls_version),
flow_to_process->ndpi_flow->host_server_name,
(flow_to_process->ndpi_flow->protos.tls_quic.advertised_alpns != NULL ?
flow_to_process->ndpi_flow->protos.tls_quic.advertised_alpns : "-"));
flow_to_process->tls_client_hello_seen = 1;
}
if (flow_to_process->tls_server_hello_seen == 0 &&
flow_to_process->ndpi_flow->tls_quic.certificate_processed != 0)
{
uint8_t unknown_tls_version = 0;
char buf_ver[16];
printf("[%8llu, %d, %4d][TLS-SERVER-HELLO] version: %s | common-name(s): %.*s | "
"issuer: %s | subject: %s\n",
workflow->packets_captured,
reader_thread->array_index,
flow_to_process->flow_id,
ndpi_ssl_version2str(buf_ver, sizeof(buf_ver),
flow_to_process->ndpi_flow->protos.tls_quic.ssl_version,
&unknown_tls_version),
(flow_to_process->ndpi_flow->protos.tls_quic.server_names_len == 0 ?
1 : flow_to_process->ndpi_flow->protos.tls_quic.server_names_len),
(flow_to_process->ndpi_flow->protos.tls_quic.server_names == NULL ?
"-" : flow_to_process->ndpi_flow->protos.tls_quic.server_names),
(flow_to_process->ndpi_flow->protos.tls_quic.issuerDN != NULL ?
flow_to_process->ndpi_flow->protos.tls_quic.issuerDN : "-"),
(flow_to_process->ndpi_flow->protos.tls_quic.subjectDN != NULL ?
flow_to_process->ndpi_flow->protos.tls_quic.subjectDN : "-"));
flow_to_process->tls_server_hello_seen = 1;
}
}
}
}
static void run_pcap_loop(struct nDPI_reader_thread const * const reader_thread)
{
if (reader_thread->workflow != NULL &&
reader_thread->workflow->pcap_handle != NULL) {
if (pcap_loop(reader_thread->workflow->pcap_handle, -1,
&ndpi_process_packet, (uint8_t *)reader_thread) == PCAP_ERROR) {
fprintf(stderr, "Error while reading pcap file: '%s'\n",
pcap_geterr(reader_thread->workflow->pcap_handle));
__sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1);
}
}
}
static void break_pcap_loop(struct nDPI_reader_thread * const reader_thread)
{
if (reader_thread->workflow != NULL &&
reader_thread->workflow->pcap_handle != NULL)
{
pcap_breakloop(reader_thread->workflow->pcap_handle);
}
}
static void * processing_thread(void * const ndpi_thread_arg)
{
struct nDPI_reader_thread const * const reader_thread =
(struct nDPI_reader_thread *)ndpi_thread_arg;
printf("Starting Thread %d\n", reader_thread->array_index);
run_pcap_loop(reader_thread);
__sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1);
return NULL;
}
static int processing_threads_error_or_eof(void)
{
int i;
for (i = 0; i < reader_thread_count; ++i) {
if (__sync_fetch_and_add(&reader_threads[i].workflow->error_or_eof, 0) == 0) {
return 0;
}
}
return 1;
}
static int start_reader_threads(void)
{
int i;
#ifndef WIN32
sigset_t thread_signal_set, old_signal_set;
sigfillset(&thread_signal_set);
sigdelset(&thread_signal_set, SIGINT);
sigdelset(&thread_signal_set, SIGTERM);
if (pthread_sigmask(SIG_BLOCK, &thread_signal_set, &old_signal_set) != 0) {
fprintf(stderr, "pthread_sigmask: %s\n", strerror(errno));
return 1;
}
#endif
for (i = 0; i < reader_thread_count; ++i) {
reader_threads[i].array_index = i;
if (reader_threads[i].workflow == NULL) {
/* no more threads should be started */
break;
}
if (pthread_create(&reader_threads[i].thread_id, NULL,
processing_thread, &reader_threads[i]) != 0)
{
fprintf(stderr, "pthread_create: %s\n", strerror(errno));
return 1;
}
}
if (pthread_sigmask(SIG_BLOCK, &old_signal_set, NULL) != 0) {
fprintf(stderr, "pthread_sigmask: %s\n", strerror(errno));
return 1;
}
return 0;
}
static int stop_reader_threads(void)
{
int i;
unsigned long long int total_packets_captured = 0;
unsigned long long int total_packets_processed = 0;
unsigned long long int total_l4_data_len = 0;
unsigned long long int total_flows_captured = 0;
unsigned long long int total_flows_idle = 0;
unsigned long long int total_flows_detected = 0;
for (i = 0; i < reader_thread_count; ++i) {
break_pcap_loop(&reader_threads[i]);
}
printf("------------------------------------ Stopping reader threads\n");
for (i = 0; i < reader_thread_count; ++i) {
if (reader_threads[i].workflow == NULL) {
continue;
}
if (pthread_join(reader_threads[i].thread_id, NULL) != 0) {
fprintf(stderr, "pthread_join: %s\n", strerror(errno));
}
total_packets_processed += reader_threads[i].workflow->packets_processed;
total_l4_data_len += reader_threads[i].workflow->total_l4_data_len;
total_flows_captured += reader_threads[i].workflow->total_active_flows;
total_flows_idle += reader_threads[i].workflow->total_idle_flows;
total_flows_detected += reader_threads[i].workflow->detected_flow_protocols;
printf("Stopping Thread %d, processed %10llu packets, %12llu bytes, total flows: %8llu, "
"idle flows: %8llu, detected flows: %8llu\n",
reader_threads[i].array_index, reader_threads[i].workflow->packets_processed,
reader_threads[i].workflow->total_l4_data_len, reader_threads[i].workflow->total_active_flows,
reader_threads[i].workflow->total_idle_flows, reader_threads[i].workflow->detected_flow_protocols);
}
/* total packets captured: same value for all threads as packet2thread distribution happens later */
total_packets_captured = reader_threads[0].workflow->packets_captured;
for (i = 0; i < reader_thread_count; ++i) {
if (reader_threads[i].workflow == NULL) {
continue;
}
free_workflow(&reader_threads[i].workflow);
}
printf("Total packets captured.: %llu\n", total_packets_captured);
printf("Total packets processed: %llu\n", total_packets_processed);
printf("Total layer4 data size.: %llu\n", total_l4_data_len);
printf("Total flows captured...: %llu\n", total_flows_captured);
printf("Total flows timed out..: %llu\n", total_flows_idle);
printf("Total flows detected...: %llu\n", total_flows_detected);
return 0;
}
static void sighandler(int signum)
{
fprintf(stderr, "Received SIGNAL %d\n", signum);
if (__sync_fetch_and_add(&main_thread_shutdown, 0) == 0) {
__sync_fetch_and_add(&main_thread_shutdown, 1);
} else {
fprintf(stderr, "Reader threads are already shutting down, please be patient.\n");
}
}
int main(int argc, char ** argv)
{
if (argc == 0) {
printf("usage: ndpiSimpleIntegration <device name>\n");
return 1;
}
printf("usage: %s [PCAP-FILE-OR-INTERFACE]\n"
"----------------------------------\n"
"nDPI version: %s\n"
" API version: %u\n"
"libgcrypt...: %s\n"
"----------------------------------\n",
argv[0],
ndpi_revision(), ndpi_get_api_version(),
(ndpi_get_gcrypt_version() == NULL ? "-" : ndpi_get_gcrypt_version()));
if (setup_reader_threads((argc >= 2 ? argv[1] : NULL)) != 0) {
fprintf(stderr, "%s: setup_reader_threads failed\n", argv[0]);
return 1;
}
if (start_reader_threads() != 0) {
fprintf(stderr, "%s: start_reader_threads\n", argv[0]);
return 1;
}
signal(SIGINT, sighandler);
signal(SIGTERM, sighandler);
while (__sync_fetch_and_add(&main_thread_shutdown, 0) == 0 && processing_threads_error_or_eof() == 0) {
sleep(1);
}
if (stop_reader_threads() != 0) {
fprintf(stderr, "%s: stop_reader_threads\n", argv[0]);
return 1;
}
return 0;
}