4141# 11-byte signature (constructed in this way to detect possible mangled bytes), flags, header extension
4242# https://www.postgresql.org/docs/9.0/sql-copy.html#AEN59377
4343PG_COPYFROM_INIT = struct .pack ('!11sII' , b'PGCOPY\n \377 \r \n \0 ' , 0 , 0 )
44- # 4-byte INETv4 prefix: family, netmask, is_cidr, n bytes
44+ # 4-byte INETv4/v6 prefix: family, netmask, is_cidr, n bytes
4545# https://doxygen.postgresql.org/network_8c_source.html#l00193
46- IPV4_PREFIX = struct .pack ('!BBBB' , socket .AF_INET , 32 , 0 , 4 )
46+ IPV4_ADDRESS_PREFIX = struct .pack ('!BBBB' , socket .AF_INET , 32 , 0 , 4 )
47+ # Gotcha: IPv6 address family in Postgres is *not* socket.AF_INET6 (10),
48+ # instead it is defined as socket.AF_INET + 1 (2 + 1 == 3)
49+ # https://doxygen.postgresql.org/utils_2inet_8h_source.html#l00040
50+ IPV6_ADDRESS_PREFIX = struct .pack ('!BBBB' , socket .AF_INET + 1 , 128 , 0 , 16 )
4751
4852
4953def _pgwriter_init ():
@@ -52,22 +56,30 @@ def _pgwriter_init():
5256 return pg_writer
5357
5458
55- def _pgwriter_write (pgwriter , ts , client_ip , IN_BYTES , PROTOCOL , DIRECTION , L4_DST_PORT , L4_SRC_PORT , INPUT_SNMP , OUTPUT_SNMP , IPV4_DST_ADDR , IPV4_SRC_ADDR ):
56- buf = struct .pack ('!HiIi4s4siQiHiHiIiIiHiHi4s4si4s4s ' ,
59+ def _pgwriter_write (pgwriter , ts , client_ip , IN_BYTES , PROTOCOL , DIRECTION , L4_DST_PORT , L4_SRC_PORT , INPUT_SNMP , OUTPUT_SNMP , address_family , IPVx_DST_ADDR , IPVx_SRC_ADDR ):
60+ buf = struct .pack ('!HiIi4s4siQiHiHiIiIiHiH ' ,
5761 11 , # number of columns
5862 4 , int (ts ), # integer - beware of Y2038 problem! :)
59- 8 , IPV4_PREFIX , socket .inet_aton (client_ip ), # 4 bytes prefix + 4 bytes IP
63+ 8 , IPV4_ADDRESS_PREFIX , socket .inet_aton (client_ip ), # 4 bytes prefix + 4 bytes IP
6064 8 , IN_BYTES , # bigint
6165 2 , PROTOCOL ,
6266 2 , DIRECTION ,
6367 4 , L4_DST_PORT ,
6468 4 , L4_SRC_PORT ,
6569 2 , INPUT_SNMP ,
6670 2 , OUTPUT_SNMP ,
67- 8 , IPV4_PREFIX , IPV4_DST_ADDR ,
68- 8 , IPV4_PREFIX , IPV4_SRC_ADDR ,
6971 )
70- pgwriter .write (buf )
72+ if address_family == socket .AF_INET6 :
73+ buf2 = struct .pack ('!i4s4si4s4s' ,
74+ 8 , IPV4_ADDRESS_PREFIX , IPVx_DST_ADDR ,
75+ 8 , IPV4_ADDRESS_PREFIX , IPVx_SRC_ADDR ,
76+ )
77+ else :
78+ buf2 = struct .pack ('!i4s16si4s16s' ,
79+ 4 + 16 , IPV6_ADDRESS_PREFIX , IPVx_DST_ADDR ,
80+ 4 + 16 , IPV6_ADDRESS_PREFIX , IPVx_SRC_ADDR ,
81+ )
82+ pgwriter .write (buf + buf2 )
7183
7284
7385def _pgwriter_finish (pgwriter ):
@@ -138,9 +150,8 @@ def process_named_pipe(named_pipe_filename):
138150 except UnknownNetFlowVersion :
139151 log .warning ("Unknown NetFlow version" )
140152 continue
141- except TemplateNotRecognized :
142- log .warning ("Failed to decode a v9 ExportPacket, template not "
143- "recognized (if this happens at the start, it's ok)" )
153+ except TemplateNotRecognized as ex :
154+ log .warning (f"Failed to decode a v9 ExportPacket, template not recognized (if this happens at the start, it's ok). Template id: { ex .template_id } " )
144155 continue
145156
146157 except Exception as ex :
@@ -194,39 +205,34 @@ def write_buffer(buffer, partition_no):
194205
195206
196207 log .debug (f"Writing { len (buffer )} records to DB, partition { partition_no } " )
197- ipv6_ignored_records = 0 # we don't support IPv6 yet
198208 # save each of the flows within the record, but use execute_values() to perform bulk insert:
199209 def _get_data (buffer ):
200210 for ts , client_ip , export in buffer :
201211 netflow_version , flows = export .header .version , export .flows
202212 if netflow_version == 9 :
203213 for f in flows :
204214 try :
205- if f .data .get ("IP_PROTOCOL_VERSION" , 4 ) == 6 :
206- ipv6_ignored_records += 1
207- continue
215+ # if f.data.get("IP_PROTOCOL_VERSION", 4) == 6:
216+ if not f .data .get ("IPV6_DST_ADDR" , None ) is None :
217+ address_family = socket .AF_INET6
218+ ipvX = "IPV6"
219+ else :
220+ address_family = socket .AF_INET
221+ ipvX = "IPV4"
208222
209223 yield (
210224 ts ,
211225 client_ip ,
212- # "IN_BYTES":
213226 f .data ["IN_BYTES" ],
214- # "PROTOCOL":
215227 f .data ["PROTOCOL" ],
216- # "DIRECTION":
217228 f .data .get ("DIRECTION" , DIRECTION_INGRESS ),
218- # "L4_DST_PORT":
219229 f .data ["L4_DST_PORT" ],
220- # "L4_SRC_PORT":
221230 f .data ["L4_SRC_PORT" ],
222- # "INPUT_SNMP":
223231 f .data ["INPUT_SNMP" ],
224- # "OUTPUT_SNMP":
225232 f .data ["OUTPUT_SNMP" ],
226- # "IPV4_DST_ADDR":
227- socket .inet_aton (f .data ["IPV4_DST_ADDR" ]),
228- # "IPV4_SRC_ADDR":
229- socket .inet_aton (f .data ["IPV4_SRC_ADDR" ]),
233+ address_family ,
234+ socket .inet_pton (address_family , f .data [f"{ ipvX } _DST_ADDR" ]),
235+ socket .inet_pton (address_family , f .data [f"{ ipvX } _SRC_ADDR" ]),
230236 )
231237 except KeyError :
232238 log .exception (f"[{ client_ip } ] Error decoding v9 flow. Contents: { repr (f .data )} " )
@@ -250,6 +256,8 @@ def _get_data(buffer):
250256 f .data ["INPUT" ],
251257 # "OUTPUT_SNMP":
252258 f .data ["OUTPUT" ],
259+ # address_family is always IPv4:
260+ socket .AF_INET ,
253261 # netflow v5 IP addresses are decoded to integers, which is less suitable for us - pack
254262 # them back to bytes:
255263 # "IPV4_DST_ADDR":
@@ -267,9 +275,6 @@ def _get_data(buffer):
267275 _pgwriter_write (pgwriter , * data )
268276 _pgwriter_finish (pgwriter )
269277
270- if ipv6_ignored_records > 0 :
271- log .error (f"We do not support IPv6 (yet), some IPv6 flow records were ignored: { ipv6_ignored_records } " )
272-
273278
274279if __name__ == "__main__" :
275280 NAMED_PIPE_FILENAME = os .environ .get ('NAMED_PIPE_FILENAME' , None )
0 commit comments