diff --git a/NERDd/blacklists.py b/NERDd/blacklists.py index 379a2603..3a37fc03 100755 --- a/NERDd/blacklists.py +++ b/NERDd/blacklists.py @@ -18,6 +18,7 @@ # Add to path the "one directory above the current file location" to find modules from "common" sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..'))) +from common.threat_categorization import * from common.utils import parse_rfc_time import common.config import common.task_queue @@ -39,7 +40,6 @@ 'ip': {'singular': "IP", 'plural': "IPs"} } - ############################################################################### def compile_regex(regex): @@ -67,14 +67,16 @@ def parse_bl_with_regex(bl_data, cregex): record_end = ip_match.span()[1] try: # classic IP address blacklist - bl_records.append(str(ipaddress.IPv4Address(bl_data[record_start:record_end]))) + bl_records.append((str(ipaddress.IPv4Address(bl_data[record_start:record_end])), None)) except ipaddress.AddressValueError: continue else: for line in bl_data.split('\n'): match = cregex.search(line) if match: - bl_records.append(str(ipaddress.IPv4Address(match.group(1)))) + ip = str(ipaddress.IPv4Address(match.group(1))) + ip_info = match.group(2) if cregex.groups > 1 else None + bl_records.append((ip, ip_info)) return bl_records @@ -89,7 +91,7 @@ def parse_bl_without_regex(bl_data): ipaddr = ipaddress.IPv4Address(record) except ipaddress.AddressValueError: continue - bl_records.append(str(ipaddr)) + bl_records.append((str(ipaddr), None)) return bl_records @@ -137,7 +139,7 @@ def download_blacklist(blacklist_url, params=None): return "" -def get_blacklist(id, name, url, regex, bl_type, life_length, params): +def get_blacklist(id, name, url, regex, bl_type, life_length, params, categorization_config): """ Download the blacklist, parse all its records, and create worker task for each IP in current blacklist. :param id: id of the blacklist @@ -159,12 +161,27 @@ def get_blacklist(id, name, url, regex, bl_type, life_length, params): log.info("{} IPs found in '{}', sending tasks to NERD workers".format(len(bl_records), id)) - for ip in bl_records: - task_queue_writer.put_task('ip', ip, [ + for ip, ip_info in bl_records: + updates = [ ('setmax', '_ttl.bl', now_plus_life_length), ('array_upsert', 'bl', {'n': id}, - [('set', 'v', 1), ('set', 't', download_time), ('append', 'h', download_time)]) - ], "blacklists") + [('set', 'v', 1), ('set', 't', download_time), ('append', 'h', download_time)]), + ] + + # threat categorization updates + for category_data in classify_ip(ip, "blacklists", log, categorization_config, id, ip_info, download_time): + subcategory_updates = [] + for subcategory, values in category_data['subcategories'].items(): + subcategory_updates.append(('extend_set', subcategory, values)) + updates.append(( + 'array_upsert', + '_threat_category', + {'d': category_data['date'], 'c': category_data['id']}, + [('add', 'src.bl', 1), *subcategory_updates] + )) + + # put task in queue + task_queue_writer.put_task('ip', ip, updates, "blacklists") def stop(signal, frame): @@ -216,6 +233,15 @@ def stop(signal, frame): log.info("Loading config file {}".format(common_cfg_file)) config.update(common.config.read_config(common_cfg_file)) + # Read categorization config + categorization_cfg_file = os.path.join(config_base_path, 'threat_categorization.yml') + log.info("Loading config file {}".format(categorization_cfg_file)) + config.update(common.config.read_config(categorization_cfg_file)) + categorization_config = { + "categories": config.get('threat_categories'), + "malware_families": common.config.read_config(config.get('malpedia_family_list_path')) + } + rabbit_config = config.get("rabbitmq") # Get number of processes from config @@ -249,7 +275,7 @@ def stop(signal, frame): assert isinstance(other_params, dict), "The additional parameter must be a dict (in config of {}.{})".format( config_path, id) # Process the blacklist - get_blacklist(id, name, url, regex, bl_type, life_length, other_params) + get_blacklist(id, name, url, regex, bl_type, life_length, other_params, categorization_config) # Schedule periodic processing... if not args.one_shot: @@ -266,7 +292,7 @@ def stop(signal, frame): other_params = bl.get('params', {}) trigger = CronTrigger(**refresh_time) - job = scheduler.add_job(get_blacklist, args=(id, name, url, regex, bl_type, life_length, other_params), + job = scheduler.add_job(get_blacklist, args=(id, name, url, regex, bl_type, life_length, other_params, categorization_config), trigger=trigger, coalesce=True, max_instances=1) log.info("{} blacklist '{}' scheduled to be downloaded at every: {}".format( diff --git a/NERDd/dshield.py b/NERDd/dshield.py index ff37584a..fcc85d17 100644 --- a/NERDd/dshield.py +++ b/NERDd/dshield.py @@ -89,11 +89,12 @@ def process_feed(feed_data): if (ips[ip_addr]["reports"] < min_reports) or (ips[ip_addr]["targets"] < min_targets): continue tq_writer.put_task('ip', ip_addr, [ - ('array_upsert', 'dshield', {'date' : date_str}, - [('set', 'reports', ips[ip_addr]["reports"]), - ('set', 'targets', ips[ip_addr]["targets"])]), - ('setmax', '_ttl.dshield', ttl_date), - ], "dshield") + ('array_upsert', 'dshield', {'date': date_str}, + [('set', 'reports', ips[ip_addr]["reports"]), + ('set', 'targets', ips[ip_addr]["targets"])]), + ('setmax', '_ttl.dshield', ttl_date), + ('array_upsert', '_threat_category', {'d': date_str, 'c': 'scan'}, [('set', 'src.dshield', ips[ip_addr]["reports"])]) + ], "dshield") logger.info("Tasks created") def download_feed(): diff --git a/NERDd/misp_receiver.py b/NERDd/misp_receiver.py index cdcc46b7..0052cac6 100644 --- a/NERDd/misp_receiver.py +++ b/NERDd/misp_receiver.py @@ -27,6 +27,7 @@ from common.config import read_config from common.task_queue import TaskQueueWriter from common.utils import int2ipstr +from common.threat_categorization import * running_flag = True zmq_alive = False @@ -61,6 +62,15 @@ logger.info("Loading config file {}".format(common_cfg_file)) config.update(read_config(common_cfg_file)) +# Read categorization config +categorization_cfg_file = os.path.join(config_base_path, 'threat_categorization.yml') +logger.info("Loading config file {}".format(categorization_cfg_file)) +config.update(read_config(categorization_cfg_file)) +categorization_config = { + "categories": config.get('threat_categories'), + "malware_families": read_config(config.get('malpedia_family_list_path')) +} + inactive_ip_lifetime = config.get('record_life_length.misp', 180) rabbit_config = config.get("rabbitmq") @@ -99,6 +109,9 @@ THREAT_LEVEL_DICT = {'1': "High", '2': "Medium", '3': "Low", '4': "Undefined"} +############################################################################## +# Main module code + def is_single_ip(ip_to_check): try: _ = ipaddress.IPv4Address(ip_to_check) @@ -265,18 +278,38 @@ def upsert_new_event(event, attrib, sighting_list, role=None): :param role: role of ip_address (src or|and dst) :return: None """ - new_event = create_new_event(event, role if role is not None else get_role_of_ip(attrib['type']), sighting_list) ip_addr = get_ip_address(attrib) + ip_role = role if role is not None else get_role_of_ip(attrib['type']) + new_event = create_new_event(event, ip_role, sighting_list) + live_till = new_event['date'] + timedelta(days=inactive_ip_lifetime) + # create update sets for NERD queue - updates = [] + event_updates = [] for k, v in new_event.items(): - updates.append(('set', k, v)) - live_till = new_event['date'] + timedelta(days=inactive_ip_lifetime) - tq_writer.put_task('ip', ip_addr, [ - ('array_upsert', 'misp_events', {'misp_instance': misp_url, 'event_id': event['id']}, updates), + event_updates.append(('set', k, v)) + updates = [ + ('array_upsert', 'misp_events', {'misp_instance': misp_url, 'event_id': event['id']}, event_updates), ('setmax', '_ttl.misp', live_till), ('setmax', 'last_activity', new_event['date']) - ], "misp_receiver") + ] + + # threat categorization updates + for category_data in classify_ip(ip_addr, "misp_receiver", logger, categorization_config, new_event, attrib, ip_role): + subcategory_updates = [] + for subcategory, values in category_data['subcategories'].items(): + subcategory_updates.append(('extend_set', subcategory, values)) + updates.append(( + 'array_upsert', + '_threat_category', + {'d': category_data['date'], 'c': category_data['id']}, + [('add', 'src.misp', 1), *subcategory_updates] + )) + + logger.debug(f"Updates for {ip_addr}:") + logger.debug(updates) + + # put task in queue + tq_writer.put_task('ip', ip_addr, updates, "misp_receiver") def process_sighting_notification(sighting): @@ -287,9 +320,10 @@ def process_sighting_notification(sighting): """ try: # event which attribute was sighted - event = misp_inst.get(sighting['event_id'])['Event'] + event = misp_inst.get_event(sighting['event_id'])['Event'] # get sightings of attribute (rather set actual values of all sightings, than just add or remove 1 sighting) - sighting_list_response = misp_inst.sighting_list(int(sighting['attribute_id']))['response'] + attr_id = int(sighting['attribute_id']) + sighting_list_response = misp_inst.search_sightings(context='attribute', context_id=attr_id) sighting_list = [] for sighting_rec in sighting_list_response: sighting_list.append({'type': sighting_rec['Sighting']['type']}) @@ -409,9 +443,21 @@ def process_edit_of_attribute(json_message): def process_new_attribute(json_message): # change looks like: "to_ids () => (1), distribution () => (5), type () => (hostname)..." attrib = json_message['Log']['change'] - attrib_type = re_attrib_type_change.search(attrib).group(1) + try: + attrib_type = re_attrib_type_change.search(attrib).group(1) + except AttributeError: + logger.error("Error", exc_info=True) + logger.error("Used regex: " + re_attrib_type_change.pattern) + logger.error("Searched text: " + attrib) + return if attrib_type in IP_MISP_TYPES: - event_id = re_event_id_change.search(json_message['Log']['change']).group(1) + try: + event_id = re_event_id_change.search(attrib).group(1) + except AttributeError: + logger.error("Error", exc_info=True) + logger.error("Used regex: " + re_attrib_type_change.pattern) + logger.error("Searched text: " + attrib) + return attrib_id = json_message['Log']['model_id'] try: attrib_value = re_attrib_type_value_title.search(json_message['Log']['title']).group(2) @@ -491,7 +537,8 @@ def receive_events(): # 'xxx': "yyy", # ...... }, # 'action': "log"} - notification_prefix, _, notification = message.partition(" ") + notification_prefix, _, notification_str = message.partition(" ") + notification = json.loads(notification_str) # check message prefix, which defines actions if notification_prefix == "misp_json_audit": diff --git a/NERDd/modules/cleaner.py b/NERDd/modules/cleaner.py index 2b1122f7..294baa3a 100644 --- a/NERDd/modules/cleaner.py +++ b/NERDd/modules/cleaner.py @@ -47,6 +47,13 @@ def __init__(self): ('!every1d',), tuple() # No key is changed; some are removed, but there's no way to specify list of keys to delete in advance; anyway it shouldn't be a problem in this case. ) + g.um.register_handler( + self.clear_threat_category, + 'ip', + ('!every1d',), + tuple() + # No key is changed; some are removed, but there's no way to specify list of keys to delete in advance; anyway it shouldn't be a problem in this case. + ) g.um.register_handler( self.check_ip_expiration, 'ip', @@ -170,6 +177,30 @@ def clear_otx_pulses(self, ekey, rec, updates): return actions + def clear_threat_category(self, ekey, rec, updates): + """ + Handler function to clear old threat category data + Remove all items under threat_category with "date" older then current + day minus 'max_event_history' days. + """ + etype, key = ekey + if etype != 'ip': + return None + + today = datetime.utcnow().date() + cut_day = (today - self.max_event_history).strftime("%Y-%m-%d") + + # Remove all threat category records with day before cut_day + actions = [] + for category_record in rec.get('_threat_category', []): + if category_record['d'] < cut_day: # Thanks to ISO format it's OK to compare dates as strings + actions.append(('array_remove', '_threat_category', {'d': category_record['d'], 'c': category_record['c']})) + + if actions: + self.log.debug("Cleaning {}: Removing {} old threat category records".format(key, len(actions) - 1)) + + return actions + def check_ip_expiration(self, ekey, rec, updates): """ Check record's TTL tokens, and either issue normal !every1d or delete the record. diff --git a/NERDd/modules/threat_category_summary.py b/NERDd/modules/threat_category_summary.py new file mode 100644 index 00000000..206895c3 --- /dev/null +++ b/NERDd/modules/threat_category_summary.py @@ -0,0 +1,119 @@ +""" +NERD module summarizing threat category records. + +Triggered when _threat_category attribute changes or at least once a day for every address. +""" + +from core.basemodule import NERDModule +import common.config +import g + +from copy import deepcopy +import datetime +import os + + +def nonlin(val, coef=0.5, max=20): + """Nonlinear transformation of [0,inf) to [0,1)""" + if val > max: + return 1.0 + else: + return (1 - coef**val) + + +class ThreatCategorySummary(NERDModule): + """ + Module summarizing threat category records. + """ + + def __init__(self): + categorization_config_file = os.path.join(g.config_base_path, g.config.get("threat_categorization_config")) + self.config = common.config.read_config(categorization_config_file) + + g.um.register_handler( + self.create_summary, # function (or bound method) to call + 'ip', # entity type + ('_threat_category', '!every1d'), # tuple/list/set of attributes to watch (their update triggers call of the registered method) + ('_threat_category_summary',) # tuple/list/set of attributes the method may change + ) + + def create_summary(self, ekey, rec, updates): + """ + Summarize threat caregory records - group records by category + - get total number of reports for each source module + - compute confidence + + Category confidence (based on reputation score method): + - take list of records from last 14 days + - compute a "daily confidence" for each day as: + - nonlin(num_of_reports) * nonlin(number_of_sources) + - where nonlin is a nonlinear transformation: 1 - 1/2^x + - get total confidence as weighted average of all "daily" ones with + linearly decreasing weight (weight = (14-n)/14 for n=0..13) + """ + etype, key = ekey + if etype != 'ip': + return None + + if '_threat_category' not in rec: + return None # No threat category records, nothing to do + + subcategory_max_length = 10 + grouped_by_category = {} + for record in rec['_threat_category']: + cat = record['c'] + if cat not in grouped_by_category: + grouped_by_category[cat] = [] + grouped_by_category[cat].append(record) + + # limit the number of subcategory values in each record + for key, values in record.items(): + if type(record[key]) is list: + record[key] = record[key][:subcategory_max_length] + + today = datetime.datetime.utcnow().date() + DATE_RANGE = 14 + summary = [] + + for cat, records in grouped_by_category.items(): + role = self.config.get(f'threat_categories.{cat}.role', '?') + cat_summary = { + 'r': role, + 'c': cat, + 'src': {}, + 's': {} + } + sources = {} + subcategories = {} + sum_weight = 0 + confidence = 0 + for record in deepcopy(records): + date = record['d'] + date = datetime.date(int(date[0:4]), int(date[5:7]), int(date[8:10])) + record_age_days = (today - date).days + if record_age_days >= DATE_RANGE: + continue + daily_reports = 0 + for source in record['src']: + if source not in sources: + sources[source] = 0 + sources[source] += record['src'][source] + daily_reports += record['src'][source] + daily_confidence = nonlin(daily_reports) * nonlin(len(record['src'])) + weight = float(DATE_RANGE - record_age_days) / DATE_RANGE + sum_weight += weight + confidence += daily_confidence * weight + del record['d'] + del record['c'] + del record['src'] + for key, values in record.items(): + if key not in subcategories: + subcategories[key] = set() + subcategories[key].update(values) + if confidence > 0: + cat_summary['conf'] = round(confidence / sum_weight, 2) + cat_summary['src'] = sources + cat_summary['s'] = {k: list(v)[:subcategory_max_length] for k, v in subcategories.items()} + summary.append(cat_summary) + summary = sorted(summary, key=lambda rec: rec['conf'], reverse=True) + return [('set', '_threat_category_summary', summary)] diff --git a/NERDd/otx_receiver.py b/NERDd/otx_receiver.py index 5943e98a..20243d43 100644 --- a/NERDd/otx_receiver.py +++ b/NERDd/otx_receiver.py @@ -33,6 +33,11 @@ from OTXv2 import OTXv2 +# Add to path the "one directory above the current file location" to find modules from "common" +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..'))) + +from common.threat_categorization import * + def parse_datetime(time_str): # Parse ISO-formatted string with optional fractional part (datetime.fromisoformat would do it from Py>=3.7, but we still use Py3.6) if '.' in time_str: @@ -77,6 +82,15 @@ def parse_datetime(time_str): logger.info("Loading config file {}".format(common_cfg_file)) config.update(read_config(common_cfg_file)) +# Read categorization config +categorization_cfg_file = os.path.join(config_base_path, 'threat_categorization.yml') +logger.info("Loading config file {}".format(categorization_cfg_file)) +config.update(read_config(categorization_cfg_file)) +categorization_config = { + "categories": config.get('threat_categories'), + "malware_families": read_config(config.get('malpedia_family_list_path')) +} + inactive_pulse_time = config.get('record_life_length.otx', 30) otx_api_key = config.get('otx_api_key', None) @@ -137,11 +151,27 @@ def upsert_new_pulse(pulse, indicator): live_till = current_time + timedelta(days=inactive_pulse_time) else: live_till = parse_datetime(indicator['expiration']) + timedelta(days=inactive_pulse_time) - tq_writer.put_task('ip', ip_addr, [ + + updates = [ ('array_upsert', 'otx_pulses', {'pulse_id': pulse['id']}, updates), ('setmax', '_ttl.otx', live_till), ('setmax', 'last_activity', current_time) - ], "otx_receiver") + ] + + # threat categorization updates + for category_data in classify_ip(ip_addr, "otx_receiver", logger, categorization_config, new_pulse): + subcategory_updates = [] + for subcategory, values in category_data['subcategories'].items(): + subcategory_updates.append(('extend_set', subcategory, values)) + updates.append(( + 'array_upsert', + '_threat_category', + {'d': category_data['date'], 'c': category_data['id']}, + [('add', 'src.otx', 1), *subcategory_updates] + )) + + # put task in queue + tq_writer.put_task('ip', ip_addr, updates, "otx_receiver") def write_time(current_time): diff --git a/NERDd/warden_receiver.py b/NERDd/warden_receiver.py index 95b656ee..9f3bc0cd 100644 --- a/NERDd/warden_receiver.py +++ b/NERDd/warden_receiver.py @@ -26,6 +26,7 @@ # Add to path the "one directory above the current file location" to find modules from "common" sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..'))) +from common.threat_categorization import * from common.utils import parse_rfc_time import common.config import common.eventdb_psql @@ -456,7 +457,7 @@ def parse_and_validate_timestamp(event, timestamp_name, t_now, max_age): return None -def receive_events(filer_path, eventdb, task_queue_writer, inactive_ip_lifetime, warden_filter=None): +def receive_events(filer_path, eventdb, task_queue_writer, inactive_ip_lifetime, categorization_config, warden_filter=None): # Infinite loop reading events as files in given directory # This loop stops on SIGINT log.info("Reading IDEA files from {}/incoming".format(filer_path)) @@ -551,18 +552,31 @@ def receive_events(filer_path, eventdb, task_queue_writer, inactive_ip_lifetime, for ipv4 in src.get("IP4", []): # TODO check IP address validity log.debug("Updating IPv4 record {}".format(ipv4)) - task_queue_writer.put_task('ip', ipv4, - [ - ('array_upsert', 'events', - {'date': date, 'node': node, 'cat': cat}, - [('add', 'n', 1), ('add', 'conns', conns)]), - ('add', 'events_meta.total', 1), - ('setmax', 'last_activity', end_time), - ('setmax', '_ttl.warden', live_till), - ('setmax', 'last_warden_event', end_time), - ], - "warden_receiver" - ) + updates = [ + ('array_upsert', 'events', + {'date': date, 'node': node, 'cat': cat}, + [('add', 'n', 1), ('add', 'conns', conns)]), + ('add', 'events_meta.total', 1), + ('setmax', 'last_activity', end_time), + ('setmax', '_ttl.warden', live_till), + ('setmax', 'last_warden_event', end_time), + ] + + # threat categorization updates + for category_data in classify_ip(ipv4, "warden_receiver", log, categorization_config, event, src): + subcategory_updates = [] + for subcategory, values in category_data['subcategories'].items(): + subcategory_updates.append(('extend_set', subcategory, values)) + updates.append(( + 'array_upsert', + '_threat_category', + {'d': category_data['date'], 'c': category_data['id']}, + [('add', 'src.warden', 1), *subcategory_updates] + )) + + # put task in queue + task_queue_writer.put_task('ip', ipv4, updates, "warden_receiver") + for ipv6 in src.get("IP6", []): log.debug( "IPv6 address in Source found - skipping since IPv6 is not implemented yet.") # The record follows:\n{}".format(str(event)), file=sys.stderr) @@ -595,11 +609,21 @@ def receive_events(filer_path, eventdb, task_queue_writer, inactive_ip_lifetime, log.info("Loading config file {}".format(common_cfg_file)) config.update(common.config.read_config(common_cfg_file)) + # Read categorization config + categorization_cfg_file = os.path.join(config_base_path, 'threat_categorization.yml') + log.info("Loading config file {}".format(categorization_cfg_file)) + config.update(common.config.read_config(categorization_cfg_file)) + inactive_ip_lifetime = config.get('record_life_length.warden', 14) warden_filter_rules = config.get('warden_filter', None) warden_nodes_path = config.get('warden_nodes_path', None) rabbit_config = config.get("rabbitmq") filer_path = config.get('warden_filer_path') + categorization_config = { + "categories": config.get('threat_categories'), + "malware_families": common.config.read_config(config.get('malpedia_family_list_path')) + } + if warden_filter_rules: try: @@ -633,4 +657,4 @@ def receive_events(filer_path, eventdb, task_queue_writer, inactive_ip_lifetime, task_queue_writer.connect() signal.signal(signal.SIGINT, stop) - receive_events(filer_path, eventdb, task_queue_writer, inactive_ip_lifetime, warden_filter) + receive_events(filer_path, eventdb, task_queue_writer, inactive_ip_lifetime, categorization_config, warden_filter) diff --git a/NERDd/worker.py b/NERDd/worker.py index 8773e8a3..c5b73aca 100755 --- a/NERDd/worker.py +++ b/NERDd/worker.py @@ -123,7 +123,8 @@ def main(cfg_file, process_index): import modules.intervals_between_events import modules.reserved_ip import modules.ttl_updater - + import modules.threat_category_summary + # Instantiate modules # TODO create all modules automatically (loop over all modules.* and find all objects derived from NERDModule) # or take if from configuration @@ -149,6 +150,7 @@ def main(cfg_file, process_index): modules.intervals_between_events.IntervalsBetweenEvents(), modules.reserved_ip.ReservedIPTags(), modules.ttl_updater.TTLUpdater(), + modules.threat_category_summary.ThreatCategorySummary(), ] diff --git a/NERDweb/nerd_main.py b/NERDweb/nerd_main.py index 0d6b3e1d..adf5378d 100644 --- a/NERDweb/nerd_main.py +++ b/NERDweb/nerd_main.py @@ -31,6 +31,7 @@ sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..'))) import common.config import common.task_queue +import common.threat_categorization from common.utils import ipstr2int, int2ipstr, parse_rfc_time from shodan_rpc_client import ShodanRpcClient @@ -69,6 +70,23 @@ p_bl_config = common.config.read_config(p_bl_cfg_file) dnsbl_config = common.config.read_config(dnsbl_cfg_file) +# Read threat categorization config +categorization_cfg_file = os.path.join(cfg_dir, 'threat_categorization.yml') +threat_cat_config = common.config.read_config(categorization_cfg_file) + +# Mapping of source IDs usind in DB (e.g. as ttl token) to user readable name +# This list is used to generate: +# - a drop-down menu in search form +# - lists of sources in threat_category tooltips +# (what is defined here appears on the web, in the same order) +SOURCE_NAMES = { + "warden": "Warden", + "bl": "Blacklists", + "dshield": "DShield", + "otx": "OTX", + "misp": "MISP", +} + # Dict: blacklist_id -> parameters # parameters should contain: # all: id, name, descr, feed_type @@ -369,6 +387,11 @@ def nonempty_ip_list_validator(form, field): raise validators.ValidationError("No IP address or CIDR string found") +def to_lower(s): + if isinstance(s, str): + s = s.lower() + return s + # ***** Auxiliary functions ***** def pseudonymize_node_name(name): @@ -841,7 +864,15 @@ class IPFilterForm(FlaskForm): source = SelectMultipleField('Source', [validators.Optional()]) source_op = HiddenField('', default="or") cat = SelectMultipleField('Event category', [validators.Optional()]) # Choices are set up dynamically (see below) + tc_role = SelectMultipleField('Role', [validators.Optional()]) + tc_category = SelectMultipleField('Category', [validators.Optional()]) + tc_subcategory_key = SelectField('Subcategory key', [validators.Optional()]) + tc_subcategory_value = StringField('Subcategory value', [validators.Optional()], filters=[strip_whitespace, to_lower]) + tc_confidence = FloatField('Min category confidence', + [validators.Optional(), validators.NumberRange(0, 1, 'Must be a number between 0 and 1')], + default=0.5) cat_op = HiddenField('', default="or") + tc_category_op = HiddenField('', default="or") node = SelectMultipleField('', [validators.Optional()]) node_op = HiddenField('', default="or") blacklist = SelectMultipleField('Blacklist', [validators.Optional()]) @@ -876,18 +907,15 @@ def __init__(self, *args, **kwargs): self.country.choices = [(i, '{} - {}'.format(i, ctrydata.names[i])) for i in ctrydata.names.keys()] # Load numbers of IPs per data source (also precomputed in DB) - # (Numbers of IPs per source are computed from TTL tokens; list of sources to show is hard-coded here, since + # (Numbers of IPs per source are computed from TTL tokens; list of sources to show is hard-coded, since # we don't want to show all used TTL token IDs as data sources.) - # mapping of DB name (ttl token) -> user readable name (what is defined here appears on the web, in the same order) - source_names = { - "warden": "Warden", - "bl": "Blacklists", - "dshield": "DShield", - "otx": "OTX", - "misp": "MISP", - } cnt_by_source = {item["_id"]: item["n"] for item in mongo.db.n_ip_by_ttl.find()} - self.source.choices = [(src_id, '{} ({})'.format(src_name, int(cnt_by_source.get(src_id, 0)))) for src_id,src_name in source_names.items()] + self.source.choices = [(src_id, '{} ({})'.format(src_name, int(cnt_by_source.get(src_id, 0)))) for src_id,src_name in SOURCE_NAMES.items()] + + # Load categorization config to get list of all categories + self.tc_role.choices = [("src", "Source"), ("dst", "Destination")] + self.tc_category.choices = sorted([(cat_id, cat_data['label']) for cat_id, cat_data in threat_cat_config["threat_categories"].items()]) + self.tc_subcategory_key.choices = [("", "--"), ("port", "Port"), ("protocol", "Protocol"), ("malware_family", "Malware family")] # Number of occurrences for blacklists (list of blacklists is taken from configuration) bl_name2num = {item['_id']: int(item['n']) for item in mongo.db.n_ip_by_bl.find()} @@ -967,6 +995,20 @@ def create_query(form): if form.cat.data: op = '$and' if (form.cat_op.data == "and") else '$or' queries.append({op: [{'events.cat': cat} for cat in form.cat.data]}) + if form.tc_role.data or form.tc_category.data or form.tc_subcategory_value.data: + elem_match = {} + if form.tc_confidence.data: + elem_match.update({"conf": {"$gte": float(form.tc_confidence.data)}}) + if form.tc_role.data: + elem_match.update({'$or': [{"r": role} for role in form.tc_role.data]}) + if form.tc_subcategory_key.data and form.tc_subcategory_value.data: + elem_match.update({f"s.{form.tc_subcategory_key.data}": form.tc_subcategory_value.data}) + if form.tc_category.data: + cat_op = '$and' if (form.tc_category_op.data == "and") else '$or' + query = {cat_op: [{"_threat_category_summary": {"$elemMatch": {**elem_match, "c": cat}}} for cat in form.tc_category.data]} + else: + query = {"_threat_category_summary": {"$elemMatch": elem_match}} + queries.append(query) if form.node.data: op = '$and' if (form.node_op.data == "and") else '$or' queries.append({op: [{'events.node': node} for node in form.node.data]}) @@ -1131,6 +1173,9 @@ def ips(): showable_misp_events += 1 ip['_showable_misp_events'] = showable_misp_events + # Add info about threat category + min_confidence = float(form.tc_confidence.data) if form.tc_confidence.data else 0 + ip['_threat_category_data_for_tags'] = create_threat_category_data_for_tags(ip.get('_threat_category_summary', []), min_confidence, 9) else: results = None form.ip_list.data = "" @@ -1140,6 +1185,78 @@ def ips(): return render_template('ips.html', json=json, ctrydata=ctrydata, blacklist_info=blacklist_info, **locals()) +# TODO move near the "ip" endpoint +def create_threat_category_table(category_records, min_confidence, max_subcategory_values): + """Prepare data about threat category tags - for ips.html as well as the table in ip.html""" + table_rows = [] + for rec in category_records: + # rec is dict with the following fields: + # 'r':str - role (src/dst) + # 'c':str - category + # 'src':dist[str,int] - number of events per source + # 's':dict[str,Any] - subcategories/details (proto, port, malware_family) + # 'conf':float - confidence + if rec['conf'] < min_confidence: + continue + tooltip_content = get_threat_category_tooltip(rec) + + # Generate table rows + # row = [role, category, subcategory, confidence, tooltip content] + subcategories = list(rec['s'].items()) + # No subcategories -> create single line + if not subcategories: + table_rows.append([rec['r'], rec['c'], "", rec['conf'], tooltip_content]) + # Subcategories + else: + # key, values = subcategories[0] + # subcategory_content = f"{key}: {', '.join(values)}" if len(values) <= max_subcategory_values else f"{key}: many" + # table_rows.append([rec['r'], rec['c'], subcategory_content, tooltip_content]) + # for item in subcategories[1:]: + # key, values = item + # subcategory_content = f"{key}: {', '.join(values)}" if len(values) <= max_subcategory_values else f"{key}: many" + # table_rows.append(["", "", subcategory_content, tooltip_content]) + for key, values in subcategories[1:]: + subcategory_content = f"{key}: {', '.join(values)}" if len(values) <= max_subcategory_values else f"{key}: many" + table_rows.append([rec['r'], rec['c'], subcategory_content, rec['conf'], tooltip_content]) + return table_rows + +def create_threat_category_data_for_tags(category_records, min_confidence, max_subcategory_values): + """Prepare data for threat category tags in search results and for table in IP datail page""" + TAG_DEFAULT_COLOR = '#777777' # if color is not defined in configuration + rows = [] + for rec in category_records: + if rec['conf'] < min_confidence: + continue + subcategories = [] + for key,values in rec['s'].items(): # subcategories + if len(values) <= max_subcategory_values: + # sort values (numerically if port numbers, lexicographically otherwise) + if key == 'port': + values.sort(key=int) + else: + values.sort() + subcategories.append(f"{key}: {', '.join(values)}") + else: + subcategories.append(f"{key}: many") + rows.append({ + 'role': rec['r'], + 'role_color': threat_cat_config.get("role_colors", {}).get(rec['r'], TAG_DEFAULT_COLOR), + 'cat': rec['c'] if rec['c'] != "unknown" else "—", # replace "unknown" with "—" + 'cat_color': threat_cat_config.get(f"threat_categories.{rec['c']}.color", TAG_DEFAULT_COLOR), + 'subcats': subcategories, + 'conf': rec['conf'], + 'tooltip': get_threat_category_tooltip(rec) + }) + return rows + +def get_threat_category_tooltip(rec): + # TODO this should be generated in a Jinja2 template or JavaScript, not here + category_description = threat_cat_config.get(f"threat_categories.{rec['c']}.description", f"ERROR: missing configuration for category '{rec['c']}'") + sources_str = ''.join([f"
  • {SOURCE_NAMES[source]} ({n_reports})
  • " for source, n_reports in sorted(rec['src'].items())]) + return f"Category \"{rec['c']}\":
    {category_description}

    Sources reporting the IP under this category (number of alerts/reports in last 14 days):
    Confidence: {rec['conf']}" + + + @app.route('/_ips_count', methods=["POST"]) def ips_count(): log_ep.log('/ips_count') @@ -1322,6 +1439,10 @@ def ip(ipaddr=None): asn_list.append(asn) ipinfo['asns'] = asn_list + # Create threat category table + #threat_category_table = create_threat_category_table(ipinfo.get('_threat_category_summary', []), 0, 9) + threat_category_data = create_threat_category_data_for_tags(ipinfo.get('_threat_category_summary', []), 0, 9) + # Pseudonymize node names if user is not allowed to see the original names if not g.ac('nodenames'): for evtrec in ipinfo.get('events', []): @@ -1662,8 +1783,14 @@ def map_index(): "ip_rep.csv", "bad_ips.txt", "bad_ips_med_conf.txt", + "ip_category.csv", + "ip_category_table.csv", ] +# Add category blacklist files (created by /scripts/generate_category_blocklist.sh) +BL_FILES = [f"bl_{cat}.txt" for cat in threat_cat_config["threat_categories"] if cat != "unknown"] +FILES += BL_FILES + @app.route('/data/') def data_index(): log_ep.log('/data') @@ -1677,7 +1804,7 @@ def data_index(): file_sizes[f] = os.stat(os.path.join(DATA_DIR, f)).st_size except OSError: file_sizes[f] = None - return render_template("data.html", title=title, file_sizes=file_sizes) + return render_template("data.html", title=title, file_sizes=file_sizes, bl_files=BL_FILES) @app.route('/data/') def data_file(filename): @@ -1739,7 +1866,7 @@ def get_ip_info(ipaddr, full): else: ipinfo = mongo.db.ip.find_one({'_id': ipint}, {'rep': 1, 'fmp': 1, 'hostname': 1, 'bgppref': 1, 'ipblock': 1, 'geo': 1, 'bl': 1, - 'tags': 1}) + 'tags': 1, '_threat_category_summary': 1}) if not ipinfo: log_err.log('404_api_ip_not_found') data['err_n'] = 404 @@ -1860,6 +1987,15 @@ def get_basic_info_dic(val): tags_l.append(d) + threat_category_l = [] + for rec in val.get('_threat_category_summary', []): + threat_category_l.append({ + 'role': rec['r'], + 'category': rec['c'], + 'subcategory': rec['s'], + 'confidence': rec['conf'], + }) + data = { 'ip': val['_id'], 'rep': val.get('rep', 0.0), @@ -1870,14 +2006,15 @@ def get_basic_info_dic(val): 'asn': val.get('asn', []), 'geo': geo_d, 'bl': bl_l, - 'tags': tags_l + 'tags': tags_l, + 'threat_category': threat_category_l } return data def get_basic_info_dic_short(val): - # only 'rep' and 'tags' fields + # only 'rep', 'tags' and 'threat_category' fields tags_l = [] for l in val.get('tags', []): d = { @@ -1886,10 +2023,20 @@ def get_basic_info_dic_short(val): } tags_l.append(d) + threat_category_l = [] + for rec in val.get('_threat_category_summary', []): + threat_category_l.append({ + 'role': rec['r'], + 'category': rec['c'], + 'subcategory': rec['s'], + 'confidence': rec['conf'], + }) + data = { 'ip': val['_id'], 'rep': val.get('rep', 0.0), - 'tags': tags_l + 'tags': tags_l, + 'threat_category': threat_category_l } return data @@ -2031,6 +2178,13 @@ def get_full_info(ipaddr=None): 'total7': val.get('events_meta', {}).get('total7', 0.0), 'total30': val.get('events_meta', {}).get('total30', 0.0), }, + 'threat_category': [{ + 'role': rec['r'], + 'category': rec['c'], + 'subcategory': rec['s'], + 'confidence': rec['conf'], + 'sources': rec['src'] + } for rec in val.get('_threat_category_summary', [])], } return Response(json.dumps(data), 200, mimetype='application/json') @@ -2372,4 +2526,4 @@ def get_shodan_response(ipaddr=None): config['login']['methods'] = {} # Run built-in server app.run(host="127.0.0.1", debug=True) - \ No newline at end of file + diff --git a/NERDweb/static/ips.js b/NERDweb/static/ips.js index 7696a2d2..6600fe58 100644 --- a/NERDweb/static/ips.js +++ b/NERDweb/static/ips.js @@ -7,6 +7,8 @@ function set_up_search_form() { $("select#country").multiselect({texts: {placeholder: "Any"}, search: true, selectAll: true}); $("select#source").multiselect({texts: {placeholder: "Any"}, search: true}); $("select#cat").multiselect({texts: {placeholder: "Any"}, search: true}); + $("select#tc_role").multiselect({texts: {placeholder: "Any"}, search: true}); + $("select#tc_category").multiselect({texts: {placeholder: "Any"}, search: true}); $("select#node").multiselect({texts: {placeholder: "Any"}, search: true}); $("select#blacklist").multiselect({texts: {placeholder: "Any"}, search: true}); $("select#tag").multiselect({texts: {placeholder: "Any"}, search: true}); @@ -106,6 +108,7 @@ function set_up_search_form() { set_up_op_button("#source_op_button", "#source_op", "OR: At least one of the selected categories", "AND: All selected categories") set_up_op_button("#cat_op_button", "#cat_op", "OR: At least one of the selected categories", "AND: All selected categories") + set_up_op_button("#tc_category_op_button", "#tc_category_op", "OR: At least one of the selected categories", "AND: All selected categories") set_up_op_button("#node_op_button", "#node_op", "OR: At least one of the selected nodes", "AND: All selected nodes") set_up_op_button("#bl_op_button", "#bl_op", "OR: At least one of the selected blacklists", "AND: All selected blacklists") set_up_op_button("#tag_op_button", "#tag_op", "OR: At least one of the selected tags", "AND: All selected tags") diff --git a/NERDweb/static/main.js b/NERDweb/static/main.js index e362b81b..222b7d01 100644 --- a/NERDweb/static/main.js +++ b/NERDweb/static/main.js @@ -2,7 +2,7 @@ function create_event_table(data) { /* data are "dataset" field of a DOM node with "data-" attributes set */ if (data.table == "") { - return "No events"; + return "No Warden events"; } var cats = data.cats.split(","); var dates = data.dates.split(","); @@ -56,6 +56,15 @@ $(function() { "ui-tooltip": "events_tooltip" } }); + /* jQuery UI tooltip at "threat category" cell with info about each category */ + $( ".threat_category_tooltip" ).tooltip({ + items: ".threat_category_tooltip", + track: false, + show: false, + hide: false, + position: {my: "left bottom", at: "left-7px top-2px", collision: "flipfit"}, + content: function() { return $(this).attr('title') }, + }); /* jQuery UI tooltip at times with "timeago" */ $( ".time" ).tooltip({ items: ".time", diff --git a/NERDweb/static/style.css b/NERDweb/static/style.css index fdfbc7e5..bc73f20e 100644 --- a/NERDweb/static/style.css +++ b/NERDweb/static/style.css @@ -14,6 +14,11 @@ h1 { margin-bottom: 0.25em; } +h2 { + font-size: larger; + margin-top: 1.5em; +} + hr { border: 0; border-top: 2px solid #0061a2; @@ -30,6 +35,9 @@ hr { padding: 0.25em 0.4em; max-width: 80em; } +.ui-tooltip ul { + margin: 0; +} p.error { color: #900; @@ -484,6 +492,70 @@ td.country a { color: inherit; } +.threat_tag { + display: inline-table; + font-size: 1em; + color: #000; + /* color will be rewritten by inline styles according to the threat category */ + background: #fff; + border-radius: 5px 0 0 5px; + border: 0; + border-right: 3px solid; + border-collapse: collapse; + box-shadow: 1px 1px 1px 1px rgba(0,0,0,0.1); + margin: 0 0.2em; +} +.threat_tag td { + border: 0; + border-right: 1px solid #777; +} +.threat_tag td:nth-child(3) { + font-size: 0.8em; + border-right: 0; +} + + +/* +.threat_category_preview table { + text-align: left; + border: hidden; +} +.threat_category_preview table td { + min-width: 1.5em; + padding: 0 0.2em; + border-width: 0px 0px 0px 0px; +} +.threat_category_preview table td + td { + color: #222; + border-left: 2px dotted #888; + padding: 0 0.5em; + min-width: 9.5em; +} +*/ +.threat_category_detail table { + text-align: left; + border: solid #000; + border-width: 1px 1px 1px 1px; + border-collapse: collapse; +} +.threat_category_detail table td, +.threat_category_detail table th { + border: solid #000; + border-width: 1px 0px 0px 0px; + padding: 0.1em 0.4em; + height: 2em; + text-align: center; +} +.threat_category_detail table th { + border-width: 1px 1px 1px 1px; +} +.threat_category_detail table td:nth-child(n+3), /* 3rd and following cells */ +.threat_category_detail table th:nth-child(n+3) +{ + text-align: left; +} + + td.events { text-align: right; padding-right: 0; @@ -1007,6 +1079,10 @@ ul.data-list li ul li { color: #777; } +.data-list+p { + margin-top: 2em; +} + /***********************************************/ /* NERD status block */ @@ -1245,6 +1321,75 @@ ul.data-list li ul li { font-weight: bold; } + #threat_category + { + display: flex; + flex-direction: row; + width: 813px; + justify-content: space-between; + padding-bottom: 25px; + } + + #threat_category p + { + font-weight: bold; + padding-right: 10px; + } + + #tc_role_label + { + display: block; + padding-bottom: 3px; + } + + #tc_role_wrap, + #tc_role_wrap button, + #tc_role_wrap .ms-options, + #tc_role_wrap .ms-options-wrap span + { + width: 115px; + } + + #tc_category_label + { + display: block; + padding-bottom: 3px; + } + + #tc_category_wrap, + #tc_category_wrap button, + #tc_category_wrap .ms-options, + #tc_category_wrap .ms-options-wrap span + { + width: 170px; + } + + #tc_subcategory_label + { + display: block; + padding-bottom: 3px; + } + + #tc_subcategory_wrap + { + width: 222px; + } + + #tc_subcategory_wrap select + { + width: 115px; + } + + #tc_confidence_label + { + display: block; + padding-bottom: 3px; + } + + #tc_confidence_wrap + { + width: 90px + } #searchForm { @@ -1311,7 +1456,7 @@ ul.data-list li ul li { #narrow .ms-options-wrap span { - width: 500px; + width: 180px; } .center-row diff --git a/NERDweb/templates/data.html b/NERDweb/templates/data.html index da87b9f7..bd8440e4 100644 --- a/NERDweb/templates/data.html +++ b/NERDweb/templates/data.html @@ -3,6 +3,7 @@

    Downloadable data

    +

    Reputation score

    +

    Threat categorization

    + + All files are updated once per hour. Please, don't download the data more often than once per hour!
    We recommend downloading it few minutes after a whole hour (i.e. between xx:01 and xx:05). +

    All files are updated once per hour.

    + {% endblock %} diff --git a/NERDweb/templates/ip.html b/NERDweb/templates/ip.html index a2bcb602..833b2ae8 100644 --- a/NERDweb/templates/ip.html +++ b/NERDweb/templates/ip.html @@ -155,6 +155,27 @@

    IP address

    {% set dbl = ipinfo.pop('dbl') %} {% endif %} +{# Threat category summary #} +

    Threat categories

    +
    + + + {% if threat_category_data %} + {% for tag in threat_category_data %} {# items in "tag": role, role_color, cat, cat_color, subcats, conf, tooltip #} + + + + + + + {% endfor %} + {% else %} + + {% endif %} +
    TLRoleCategoryDetails
    {{ (tag.conf*100)|round|int }}{{ tag.role|safe }}{{ tag.cat|safe }}{% for subcat in tag.subcats %}{{ subcat|safe }}
    {% endfor %}
    No threat category tags assigned
    +
    +
    + {# Warden events #} {% if ipinfo.events %}
    Warden events ({{ipinfo.events_meta.total}})
    @@ -180,6 +201,7 @@

    IP address

    {% endif %} {# MISP events #} +{# TODO: filter "showable" events in backend! #} {% if ipinfo.misp_events %} {% set misp_events = namespace(showable=0) -%} {% for misp_event in ipinfo.misp_events -%} @@ -187,10 +209,10 @@

    IP address

    {% set misp_events.showable = misp_events.showable + 1 -%} {% endif -%} {% endfor -%} - {% if misp_events.showable %} + {% if misp_events.showable > 0 %}
    MISP events
    1%} class="scrollable"{% endif %}> - {% for misp_event in val|sort(attribute='date', reverse=True) %} + {% for misp_event in ipinfo.misp_events|sort(attribute='date', reverse=True) %} {% if misp_event.tlp == "white" or (misp_event.tlp == "green" and ac('tlp-green')) %}
    [{{ misp_event.event_id }}] {{ misp_event.pop('date', 'no date') }} | {{ misp_event.pop('info', 'no info') }} diff --git a/NERDweb/templates/ips.html b/NERDweb/templates/ips.html index f96de341..76d08474 100644 --- a/NERDweb/templates/ips.html +++ b/NERDweb/templates/ips.html @@ -154,7 +154,62 @@

    Search IP addresses by ...

    {% endif %} - + +
    +
    +

    Threat category

    +
    +
    + + Role +
    + +
    + Select IP addresses with threat category records matching the selected role. +
    +
    +
    + {{ form.tc_role() }} +
    +
    + + Category +
    + +
    + Select IP addresses with threat category records matching the selected category. +
    +
    +
    OR
    AND
    +
    + {{ form.tc_category() }}{{ form.tc_category_op() }} +
    +
    + + Subcategory +
    + +
    + Select IP addresses with threat category records matching the selected subcategory. +
    +
    +
    + {{ form.tc_subcategory_key() }} = {{ formfield(form.tc_subcategory_value, size=8) }} +
    +
    + + Confidence +
    + +
    + Minimum category confidence. +
    +
    +
    + {{ formfield(form.tc_confidence, size=8) }} +
    +
    +

    Sorting options

    @@ -264,6 +319,7 @@

    Search IP addresses by ...

    --> {% if ac('fmp') %}{% endif %} + @@ -311,6 +367,23 @@

    Search IP addresses by ...

    {{ "%.3f"|format(ip.fmp.general) if fmp is defined else "---" }} {% endif %} +
    Device typeRep.(?)FMPThreat category Other properties Time added Last activity +
    + {% if ip._threat_category_data_for_tags %} + {% for tag in ip._threat_category_data_for_tags %} + + + + + {% if tag.subcats -%} + + {%- endif %} + +
    {{ tag.role|safe }}{{ tag.cat|safe }}{% for subcat in tag.subcats %}{{ subcat|safe }}
    {% endfor %}
    + {% endfor %} + {% endif %} +
    +
    {% if ip.bl -%} {% set bl_cnt = ip.bl|selectattr("v")|list|length -%} @@ -337,7 +410,8 @@

    Search IP addresses by ...

    {% if ip.open_ntp %}Open NTP{% endif %} {% if ip.open_snmp %}Open SNMP{% endif %} #} - + + {# {% if ip.tags %} {% for tag_id,tag_param in ip.tags.items() %} {% if tag_id in config_tags and "name" in config_tags[tag_id] %} @@ -373,7 +447,7 @@

    Search IP addresses by ...

    {% endif %} {% endfor %} {% endif %} - + #} {% if ip.shodan %} {% if ip.shodan.ports %} {{ip.shodan.ports|join_max(5)}}{% endif %} {% if ip.shodan.tags %}{{ip.shodan.tags|join(', ')}}{% endif %} diff --git a/common/config.py b/common/config.py index aff7d800..e79f007f 100644 --- a/common/config.py +++ b/common/config.py @@ -21,15 +21,20 @@ def hierarchical_get(self, key, default=NoDefault): instead. """ d = self + full_key = key try: while '.' in key: first_key, key = key.split('.', 1) d = d[first_key] - return d[key] + result = d[key] + if isinstance(result, dict): + return HierarchicalDict(result) + else: + return result except (KeyError, TypeError): pass # not found - continue below if default is NoDefault: - raise MissingConfigError("Mandatory configuration element is missing: " + key) + raise MissingConfigError("Mandatory configuration element is missing: " + full_key) else: return default diff --git a/common/threat_categorization.py b/common/threat_categorization.py new file mode 100644 index 00000000..f6a4778e --- /dev/null +++ b/common/threat_categorization.py @@ -0,0 +1,195 @@ +import yaml +import ast +import re +from datetime import datetime + +from .utils import parse_rfc_time + + +class ClassifiableEvent: + def __getattr__(self, name): + """ + Override __getattr__ so that no error is raised when a trigger tries to use non-existing attribute + :param name: Name of the attribute + :return: Value of the attribute (or None if it does not exist) + """ + return self.__dict__[name] if name in self.__dict__ else None + + def __str__(self): + """ + Override __str__ for easier logging of assigned categories + :param name: Name of the attribute + :return: String representation of the object's attribute dictionary + """ + return str(self.__dict__) + + def __init__(self, module_name=None, *args): + """ + Initialize the event (fill metadata from source module) + :param module_name: Name of the attribute + :param *args: Module specific attributes + :return: + """ + init_fn = getattr(self, f"init_{module_name}") + init_fn(*args) + + def init_warden_receiver(self, event, source): + """ + Fill in metadata from a warden event + :param event: Source event + :return: + """ + detect_time = parse_rfc_time(event["DetectTime"]) + self.date = detect_time.strftime("%Y-%m-%d") + self.categories = event.get('Category', []) + self.description = event.get("Description", "") + self.note = event.get("Note", "") + self.ip_info = source.get('Note', "") + self.source_types = source.get('Type', []) + target_ports = [] + protocols = [] + for source in event.get('Source', []): + protocols += source.get('Proto', []) + for target in event.get('Target', []): + target_ports += target.get('Port', []) + protocols += target.get('Proto', []) + self.target_ports = [str(port) for port in set(target_ports)] + self.protocols = list(set(protocols) - {'tcp', 'udp'}) # don't include L4 protocols (TCP, UDP), keep just the application layer ones + + def init_otx_receiver(self, pulse): + """ + Fill in metadata from an OTX pulse + :param pulse: Source pulse + :return: + """ + self.date = datetime.strftime(pulse.get('pulse_modified', datetime.now()), "%Y-%m-%d") + self.indicator_role = str(pulse.get('indicator_role', None)) + self.ip_info = str(pulse.get('indicator_title', None)) + self.description = str(pulse.get('pulse_name', None)) + self.protocols = [] + self.target_ports = [] + + def init_misp_receiver(self, event, attrib, ip_role): + """ + Fill in metadata from a MISP event + :param event: Source event + :param attrib: Attribute with the source IP + :param ip_role: Role of the IP address (src/dst/both) + :return: + """ + self.date = datetime.strftime(attrib.get('date', datetime.now()), "%Y-%m-%d") + self.tags = [tag["name"] for tag in event.get('tag_list', [])] + self.description = event.get('info', "") + self.ip_info = attrib.get('comment', "") + self.ip_role = ip_role + self.protocols = [] + self.target_ports = [] + try: + if attrib['type'] == "ip-dst|port": + split_attrib = attrib['value'].split('|') + if len(split_attrib) == 1: + split_attrib = attrib['value'].split(':') + if len(split_attrib) > 1: + self.target_ports = [int(split_attrib[1])] + except ValueError: + pass + + def init_blacklists(self, blacklist_id, ip_info, download_time): + """ + Fill in metadata from a blacklist record + :param blacklist_id: ID of the blacklist + :param ip_info: Additional info about the IP + :param download_time: Time when the blacklist was downloaded + :return: + """ + self.date = download_time.strftime("%Y-%m-%d") + self.description = blacklist_id + self.ip_info = str(ip_info) + self.protocols = [] + self.target_ports = [] + + +def classify_ip(ip_addr, module_name, logger, config, *args): + """ + Assign a threat category based on the information provided in the incoming event + + :return: List of assigned categories + """ + try: + output = [] + event = ClassifiableEvent(module_name, *args) + for category_id, category_params in config["categories"].items(): + category_triggers = category_params.get("triggers", {}).get("general", "False").split("\n") + \ + category_params.get("triggers", {}).get(module_name, "False").split("\n") + for trigger in category_triggers: + result, subcategories = eval_trigger(trigger, event, category_params, config, logger) + if result is True: + output.append({ + "date": event.date, + "id": category_id, + "role": category_params["role"], + "subcategories": subcategories + }) + break + except Exception as e: + logger.error(f"Error in threat category classification for IP {ip_addr}: {e}") + if not output: + output.append({"date": event.date, "id": "unknown", "role": "src", "subcategories": {}}) + # with open(f"/var/log/nerd/threat_categorization_unknown.log", "a+") as logfile: + # logfile.write(f"[{datetime.now()}] MODULE: {module_name} IP: {ip_addr} EVENT-INFO: {event}\n") + logger.debug(f"Threat category classification for {ip_addr}: {output}; Event info: {event}") + return output + + +def eval_trigger(trigger, event, category_params, config, logger): + """ + Evaluate a category trigger + :param trigger: Trigger to be evaluated + :param event: Source event (instance of ClassifiableEvent) from which the trigger reads data + :param category_params: Category parameters (e.g. list of subcategories) + :param logger: Source module logger + :return: Result of the evaluation (True/False), dictionary with subcategory assignments + """ + result = False + required_subcategories = category_params.get("subcategories", []) + subcategories = {s: [] for s in required_subcategories} + + try: + split_trigger = trigger.split("->") + if eval(split_trigger[0]) is True: + result = True + if len(split_trigger) > 1: + subcategories.update(ast.literal_eval(split_trigger[1].lstrip())) + except Exception as e: + logger.error(f"Error when evaluating category trigger ({trigger}): {e}") + logger.error(f"Event info: {event}") + + if result is True: + if "port" in required_subcategories: + subcategories["port"] += event.target_ports + subcategories["port"] = list(set(subcategories["port"])) + if "protocol" in required_subcategories: + subcategories["protocol"] += event.protocols + subcategories["protocol"] = list(set(subcategories["protocol"])) + if "malware_family" in required_subcategories: + text = f"{event.description};{event.ip_info}" + for family_id, family_data in config["malware_families"].items(): + if match_str(family_data["common_name"], text): + subcategories["malware_family"].append(family_id.lower()) + subcategories["malware_family"] = list(set(subcategories["malware_family"])) + + for key in list(subcategories): + if not subcategories[key]: + subcategories.pop(key) + return result, subcategories + + +def match_str(str_a, str_b): + """ + Approximate (sub)string matching + + Ignores character casing, whitespace and some special characters + """ + simplified_a = str_a.strip().replace("_", "").replace(".", "").replace("-", "").lower() + simplified_b = str_b.strip().replace("_", "").replace(".", "").replace("-", "").lower() + return simplified_a in simplified_b diff --git a/etc/nerd.yml b/etc/nerd.yml index 42f2bd36..4e8214ab 100644 --- a/etc/nerd.yml +++ b/etc/nerd.yml @@ -73,6 +73,9 @@ dnsbl: dns_blacklists.yml # Configuration file for EventCountLogger event_logging_config: event_logging.yml +# Threat categorization configuration file +threat_categorization_config: threat_categorization.yml + # EventDB type (where to store/read events), may be one of: # 'psql' - (default) Local PostgreSQL database (needs config in 'eventdb_psql' in nerdd.yml) # 'mentat' - External Mentat instance (no storage by NERD, load via Mentat API) (needs config in 'eventdb_mentat') diff --git a/etc/threat_categorization.yml b/etc/threat_categorization.yml new file mode 100644 index 00000000..e6155df8 --- /dev/null +++ b/etc/threat_categorization.yml @@ -0,0 +1,289 @@ +# Path to YAML file with Malpedia's list of malware families +# Used for malware subcategory classification +malpedia_family_list_path: "/data/malpedia/malware_families.yml" + +# Colors for roles (src/dst) used in tags in web UI +role_colors: + src: "#ffaa66" # light orange + dst: "#ff595b" # light red + +# Threat categorization +# Structure: +# dict{category ID -> category parameters} +# Category parameters: +# - role: IP role (src/dst) that will be assigned along with the main category +# - label: Displayed name of the category +# - description: General description of the category +# - color: Color of the category tag in web UI (any CSS-compatible format) +# - subcategories: List of required subcategories (port, protocol, malware_family) +# Supported subcategories: +# - port +# - protocol +# - malware_family +# +# - triggers: List of category triggers (single string separated by newlines) +# Divided to sections for each source module so that they do not have to evaluate unnecessary triggers +# IP is assigned a category if a related trigger is evaluated as True +# +# Supported syntax: +# - triggers can have two parts separated by '->', both use standard Python syntax +# - first part is mandatory and should resolve to either True or False +# - second part is optional and contains a dictionary definition, which is used to specify subcategories +# +# Evaluation: +# - triggers are evaluated by source modules when a new event is received (except for blacklists module +# which instead uses it as a blacklist ID for blacklist -> category mapping) +# - IP is assigned a category if any of the related statements resolve to True +# - if required by the category configuration, the IP is also assigned subcategories based on the second +# part of the statement (can be empty) +# - within each statement it is possible to access an 'event' object (instance of ClassifiableEvent), +# which represents the event that is currently being classified. +# - event properties: +# - date: Date of the event (YY-MM-DD) +# - description: Event description (string) +# - ip_info: Additional info about the IP (string, e.g. attribute comment from MISP) +# - categories/tags/indicator_role: List of event categories/tags/indicators that can be used for classification +# - protocols: List of protocols used by the IP +# - target_ports: List of target ports used by the IP + +threat_categories: + unknown: + role: src + description: The IP was reported as a source of malicious/unexpected/rogue packets, but without any further specification. + label: Unknown + color: "#cccccc" + + scan: + role: src + description: The IP address performs network scanning, i.e. it tries to connect to various targets to search for open ports/services. + label: Scanning + color: "#aaffff" + subcategories: + - port + triggers: + general: |- + bool(re.findall(r'(?i)scanning|scanner|probing', event.ip_info + event.description)) + warden_receiver: |- + 'Recon.Scanning' in event.categories + otx_receiver: |- + bool(re.findall(r'(?i)scanning|scanner|probing', event.indicator_role)) + misp_receiver: |- + any([bool(re.findall(r'(?i)scanning|scanner|probing', tag)) for tag in event.tags]) + blacklists: |- + event.ip_info == 'crowdsecurity/iptables-scan-multi_ports' + event.ip_info == 'crowdsecurity/http-crawl-non_statics' + event.ip_info == 'crowdsecurity/http-path-traversal-probing' + event.ip_info == 'crowdsecurity/http-admin-interface-probing' + event.ip_info == 'crowdsecurity/http-probing' + event.ip_info == 'crowdsecurity/http-sensitive-files' + + login: + role: src + description: The IP tries to access password-protected services without authorization (e.g. dictionary or bruteforce attacks, login attempts on honeypots). + label: Login attempts + color: "#55ddaa" + subcategories: + - protocol + - port + triggers: + general: |- + bool(re.findall(r'(?i)ssh.*(brute[\s_-]?force|login|intrusion|honeypot)', event.ip_info + event.description)) -> {'protocol': ['ssh']} + bool(re.findall(r'(?i)rdp.*(brute[\s_-]?force|login|intrusion|honeypot)', event.ip_info + event.description)) -> {'protocol': ['rdp']} + bool(re.findall(r'(?i)telnet.*(brute[\s_-]?force|login|intrusion|honeypot)', event.ip_info + event.description)) -> {'protocol': ['telnet']} + bool(re.findall(r'(?i)vnc.*(brute[\s_-]?force|login|intrusion|honeypot)', event.ip_info + event.description)) -> {'protocol': ['vnc']} + bool(re.findall(r'(?i)redis.*(brute[\s_-]?force|login|intrusion|honeypot)', event.ip_info + event.description)) -> {'protocol': ['redis']} + bool(re.findall(r'(?i)postgresql.*(brute[\s_-]?force|login|intrusion|honeypot)', event.ip_info + event.description)) -> {'protocol': ['postgresql']} + warden_receiver: |- + 'Attempt.Login' in event.categories + 'Intrusion.UserCompromise' in event.categories + 'Intrusion.AdminCompromise' in event.categories + otx_receiver: |- + bool(re.findall(r'(?i)brute[\s_-]?force', event.indicator_role)) + misp_receiver: |- + any([bool(re.findall(r'(?i)login.*attempt', tag)) for tag in event.tags]) + any([bool(re.findall(r'(?i)brute[\s_-]?force', tag)) for tag in event.tags]) + blacklists: |- + bool(re.findall(r'(?i)brute[\s_-]?force.*ssh', event.ip_info)) -> {'protocol': ['ssh']} + bool(re.findall(r'(?i)brute[\s_-]?force.*ftp', event.ip_info)) -> {'protocol': ['ftp']} + bool(re.findall(r'(?i)brute[\s_-]?force.*sip', event.ip_info)) -> {'protocol': ['sip']} + event.description == 'blocklist_de-ssh' -> {'protocol': ['ssh']} + event.description == 'charles_the_haleys_ssh_dico_ips' -> {'protocol': ['ssh']} + event.description == 'charles_the_haleys_smtp_dico_ips' -> {'protocol': ['smtp']} + event.description == 'dataplane_org_sshclient' -> {'protocol': ['ssh']} + event.description == 'dataplane_org_sshpwauth' -> {'protocol': ['ssh']} + event.description == 'dataplane_org_telnet_login' -> {'protocol': ['telnet']} + event.description == 'bruteforceblocker' + event.description == 'blocklist_de-bruteforcelogin' + event.ip_info == 'crowdsecurity/http-generic-bf' -> {'protocol': ['http']} + event.ip_info.startswith('crowdsecurity/ssh-') -> {'protocol': ['ssh']} + + ddos: + role: src + description: The IP has been observed as a source of volumetric (D)DoS attacks. + label: DDoS + color: "#c44a48" + triggers: + general: |- + bool(re.findall(r'(?i)http.*flood', event.ip_info + event.description)) -> {'protocol': ['http']} + bool(re.findall(r'(?i)dns.*flood', event.ip_info + event.description)) -> {'protocol': ['dns']} + bool(re.findall(r'(?i)udp.*flood', event.ip_info + event.description)) -> {'protocol': ['udp']} + bool(re.findall(r'(?i)(ping|icmp).*flood', event.ip_info + event.description)) -> {'protocol': ['icmp']} + bool(re.findall(r'(?i)syn.*flood', event.ip_info + event.description)) -> {'protocol': ['tcp']} + warden_receiver: |- + 'Availability.DoS' in event.categories + 'Availability.DDoS' in event.categories + misp_receiver: |- + any([bool(re.findall(r'(?i)d?dos', tag)) for tag in event.tags]) + any([bool(re.findall(r'(?i)denial[\s_-]of[\s_-]service', tag)) for tag in event.tags]) + blacklists: |- + bool(re.findall(r'(?i)d?dos', event.ip_info)) + + ddos-amplifier: + role: dst + description: The IP runs a service which can be (and often is) misused as an amplifier for DDoS attacks, e.g. open DNS resolvers, NTP servers, memcached, etc. + label: DDoS amplifier + color: "#ff9769" + subcategories: + - protocol + triggers: + general: |- + bool(re.findall(r'(?i)(Open|Abusable)[\s_-]DNS', event.ip_info + event.description)) -> {'protocol': ['dns']} + bool(re.findall(r'(?i)(Open|Abusable)[\s_-]Memcached', event.ip_info + event.description)) -> {'protocol': ['memcached']} + bool(re.findall(r'(?i)(Open|Abusable)[\s_-]NTP', event.ip_info + event.description)) -> {'protocol': ['ntp']} + warden_receiver: |- + ('Availability.DoS' in event.categories or 'Availability.DDoS' in event.categories) and bool(re.findall(r'(?i)dns.*amplification', event.description + event.note)) -> {'protocol': ['dns']} + 'Vulnerable.Config' in event.categories and 'dns' in event.protocols -> {'protocol': ['dns']} + 'Vulnerable.Config' in event.categories and 'ntp' in event.protocols -> {'protocol': ['ntp']} + 'Vulnerable.Config' in event.categories and 'memcached' in event.protocols -> {'protocol': ['memcached']} + 'Backscatter' in event.ip_info and 'dns' in event.protocols -> {'protocol': ['dns']} + 'Backscatter' in event.ip_info and 'ntp' in event.protocols -> {'protocol': ['ntp']} + 'Backscatter' in event.ip_info and 'memcached' in event.protocols -> {'protocol': ['memcached']} + + spam: + role: src + description: The IP is sending spam. + label: Spam + color: "#2255bb" + triggers: + general: |- + bool(re.findall(r'(?i)send.*spam', event.ip_info + event.description)) + warden_receiver: |- + 'Abusive.Spam' in event.categories + any([type == 'Spam' for type in event.source_types]) + misp_receiver: |- + any([bool(re.findall(r'(?i)spam', tag)) for tag in event.tags]) + blacklists: |- + event.description == 'sblam_ips' + event.description == 'psbl' + event.description == 'spamhaus_edrop' + bool(re.findall(r'(?i)send.*spam', event.ip_info)) + event.ip_info == 'crowdsecurity/postfix-spam' + + malware_distribution: + role: dst + description: The IP is used to distribute malware (e.g. hosts an HTTP URL from which a malware is being downloaded). + label: Malware distribution + color: "#dd77ff" + subcategories: + - malware_family + triggers: + general: |- + bool(re.findall(r'(?i)malware.*host', event.ip_info + event.description)) + bool(re.findall(r'(?i)malware.*download', event.ip_info + event.description)) + warden_receiver: |- + any([type == 'Malware' for type in event.source_types]) + otx_receiver: |- + bool(re.findall(r'(?i)malware.*host', event.indicator_role)) + bool(re.findall(r'(?i)malware.*download', event.indicator_role)) + misp_receiver: |- + any([bool(re.findall(r'(?i)(malware|trojan|ransomware)', tag)) for tag in event.tags]) and event.ip_role == "dst" + blacklists: |- + event.description == 'urlhaus_ips' + + cc: + role: dst + description: The IP is used as a Command&Control server for a botnet/malware. + label: Command and control + color: "#ff77dd" + subcategories: + - malware_family + triggers: + general: |- + bool(re.findall(r'(?i)command.*control', event.ip_info + event.description)) + bool(re.findall(r'(?i)botnet[\s_-]cc', event.ip_info + event.description)) + bool(re.findall(r'(?i)c2[\s_-]server', event.ip_info + event.description)) + bool(re.findall(r'(?i)c&?c[\s_-]server', event.ip_info + event.description)) + warden_receiver: |- + any([type == 'CC' for type in event.source_types]) + otx_receiver: |- + bool(re.findall(r'(?i)command.*control', event.indicator_role)) + bool(re.findall(r'(?i)c2', event.indicator_role)) + bool(re.findall(r'(?i)c&?c', event.indicator_role)) + misp_receiver: |- + any([bool(re.findall(r'(?i)command.*control', tag)) for tag in event.tags]) + any([bool(re.findall(r'(?i)c2', tag)) for tag in event.tags]) + any([bool(re.findall(r'(?i)c&c', tag)) for tag in event.tags]) + blacklists: |- + event.description == 'feodo' -> {'malware_family': ['win.feodo']} + event.description == 'bambenek_c2' + + botnet_drone: + role: src + description: The IP is acting as a bot/drone of a botnet. + label: Botnet drone + color: "#d090d0" + subcategories: + - malware_family + triggers: + general: |- + bool(re.findall(r'(?i)botnet.*drone', event.ip_info + event.description)) + bool(re.findall(r'(?i)botnet.*member', event.ip_info + event.description)) + warden_receiver: |- + 'Intrusion.Botnet' in event.categories + any([type == 'Botnet' for type in event.source_types]) + misp_receiver: |- + any([bool(re.findall(r'(?i)botnet', tag)) for tag in event.tags]) + blacklists: |- + event.description == 'mirai_tracker_ips' -> {'malware_family': ['elf.mirai']} + + phishing_site: + role: dst + description: The IP is hosting a phishing website. + label: Phishing site + color: "#dddd00" + triggers: + general: |- + bool(re.findall(r'(?i)phishing.*site', event.ip_info + event.description)) + warden_receiver: |- + any([type == 'Phishing' for type in event.source_types]) + misp_receiver: |- + any([bool(re.findall(r'(?i)phishing.*site', tag)) for tag in event.tags]) + any([bool(re.findall(r'(?i)phishing', tag)) for tag in event.tags]) and event.ip_role == "dst" + blacklists: |- + event.description == 'openphish' + + exploit: + role: src + description: The IP is attempting to exploit known vulnerabilities. + label: Exploit + color: "#22ee88" + subcategories: + - protocol + triggers: + general: |- + bool(re.findall(r'(?i)attempt.*exploit', event.ip_info + event.description)) + warden_receiver: |- + 'Attempt.Exploit' in event.categories + otx_receiver: |- + 'Apache honeypot logs' in event.description -> {'protocol': ['http']} + bool(re.findall(r'(?i)exploit', event.indicator_role)) + misp_receiver: |- + any([bool(re.findall(r'(?i)exploit', tag)) for tag in event.tags]) + 'CERT-XLM:intrusion-attempts="new-attack-signature"' in event.tags + 'circl:incident-classification="XSS"' in event.tags + 'circl:incident-classification="sql-injection"' in event.tags + blacklists: |- + bool(re.findall(r'(?i)CVE[-_]20', event.ip_info)) + event.ip_info == 'http-sqli-probing' -> {'protocol': ['http']} + event.ip_info == 'http-xss-probing' -> {'protocol': ['http']} + event.ip_info == 'http-backdoors-attempts' -> {'protocol': ['http']} diff --git a/install/cron/nerd b/install/cron/nerd index c1f93109..42c4657e 100644 --- a/install/cron/nerd +++ b/install/cron/nerd @@ -11,6 +11,9 @@ 00 * * * * nerd /nerd/scripts/generate_blocklist.sh 0.5 | sort -n > /data/web_data/bad_ips.txt.tmp && mv /data/web_data/bad_ips.txt{.tmp,} 00 * * * * nerd /nerd/scripts/generate_blocklist.sh 0.2 | sort -n > /data/web_data/bad_ips_med_conf.txt.tmp && mv /data/web_data/bad_ips_med_conf.txt{.tmp,} +# Generate lists of IPs and their categories every hour +00 * * * * nerd python3 /nerd/scripts/generate_ip_category_files.py --threshold 0.5 --output /data/web_data + # Remove old IDEA messages from PostgreSQL every day at 03:00 # (enable if local PSQL is used to store alerts from Warden) #0 03 * * * nerd /nerd/scripts/nerd_clean_eventdb.sh > /dev/null @@ -24,6 +27,8 @@ 40 01,09,17 * * * nerd rsync -azq rsync-mirrors.uceprotect.net::RBLDNSD-ALL/dnsbl-1.uceprotect.net /data/blacklists/uceprotect-level1 # rsync PSBL blacklist 3 times a day 41 01,09,17 * * * nerd rsync -zq psbl-mirror.surriel.com::psbl/psbl.txt /data/blacklists/psbl.txt +# rsync Crowdsec blacklist 3 times a day +42 01,09,17 * * * nerd rsync -zq logs.liberouter.org::crowdsec/crowdsec_blacklist.csv /data/blacklists/crowdsec.csv # Export Crowdsec community blacklist to CSV # (enable if you're using Crowdsec) diff --git a/scripts/download_malpedia_families.py b/scripts/download_malpedia_families.py new file mode 100644 index 00000000..d00301c6 --- /dev/null +++ b/scripts/download_malpedia_families.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python3 + +# Download Malpedia's list of malware families to /data/malpedia/malware_families.yml + +import requests +import yaml +import json +import os + +url = "https://malpedia.caad.fkie.fraunhofer.de/api/get/families" +response = requests.get(url) +data = json.loads(response.content) +output = {} +output_dir = "/data/malpedia/" + +for family_id, family_data in data.items(): + name = family_data.get("common_name", "") + if name == "": + try: + name = family_id.split('.')[1] + except Exception: + name = family_id + output[family_id] = { + "common_name": name, + "description": family_data.get("description", ""), + "url": f"https://malpedia.caad.fkie.fraunhofer.de/details/{family_id}" + } + +if not os.path.exists(output_dir): + os.makedirs(output_dir) + +with open(f"{output_dir}/malware_families.yml", "w+") as outfile: + yaml.dump(output, outfile, default_flow_style=False) diff --git a/scripts/generate_ip_category_files.py b/scripts/generate_ip_category_files.py new file mode 100644 index 00000000..c64ea5f4 --- /dev/null +++ b/scripts/generate_ip_category_files.py @@ -0,0 +1,119 @@ +#!/usr/bin/env python3 +""" +Generate list of all IPs in NERD's database with their categories +Parameters - path to config directory + - path to output directory + - blacklist confidence threshold +""" +import os +import sys +import subprocess +import argparse + +# Add to path the "one directory above the current file location" to find modules from "common" +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..'))) + +from common.config import read_config + + +# parse arguments +parser = argparse.ArgumentParser( + prog="generate_ip_category_files.py", + description="Generate list of all IPs in NERD's database with their categories" +) +parser.add_argument("-c", '--config', dest='cfg_file', default="/etc/nerd/threat_categorization.yml", + help="Path to configuration file (default: /etc/nerd/threat_categorization.yml)") +parser.add_argument("-o", '--output', dest='out_dir', default="/data/web_data/", + help="Path to output directory (default: /data/web_data/)") +parser.add_argument("-t", '--threshold', dest='conf_thr', default="0.5", + help="Blacklist confidence threshold (default: 0.5)") +parser.add_argument("-v", '--verbose', dest="verbose", action="store_true", help="Verbose mode") +args = parser.parse_args() + +# read categorization config +config = read_config(args.cfg_file) +categories = [cat for cat in config.get('threat_categories')] +categories.remove("unknown") + +# bash script used to execute the DB query +script = """ + echo \"# generated at $(date -u '+%Y-%m-%d %H:%M UTC')\" > {out_file}.tmp && + echo \"# {header}\" >> {out_file}.tmp && + mongosh nerd --quiet --eval '{query}' | grep -v \"^$\" | sort -n >> {out_file}.tmp && + mv {out_file}{{.tmp,}} +""" + +def fstr(template): + return eval(f"f'''{template}'''") + +######################################################################################################################## + +# full list - line format (ip,category,confidence) +if args.verbose: + print("Generating full IP list (line format)") + +out_file = f"{args.out_dir}/ip_category.csv" +header = "ip,category,confidence" +query = ''' +function int2ip (ipInt) { + return ( (ipInt>>>24) + "." + (ipInt>>16 & 255) + "." + (ipInt>>8 & 255) + "." + (ipInt & 255) ); +} +db.ip.aggregate([ + { $unwind: "$_threat_category_summary" }, + { $set: { category: "$_threat_category_summary.c", confidence: { $toString: "$_threat_category_summary.conf" } } }, + { $project: { _id: 1, category: 1, confidence: 1 } } +]).forEach(function(rec) { print(int2ip(rec._id) + "," + rec.category + "," + rec.confidence); }) +''' +subprocess.run(fstr(script), shell=True) + +######################################################################################################################## + +# full list - table format (ip,conf_scan,conf_bruteforce,...) +if args.verbose: + print("Generating full IP list (table format)") + +out_file = f"{args.out_dir}/ip_category_table.csv" +header = f"# ip,conf_{',conf_'.join(categories)}" +query = ''' +function int2ip (ipInt) { + return ( (ipInt>>>24) + "." + (ipInt>>16 & 255) + "." + (ipInt>>8 & 255) + "." + (ipInt & 255) ); +} +db.ip.aggregate([ + { $project: { _id: 1, _threat_category_summary: 1 } }, + { $set: { categories: { $arrayToObject: { $map: { input: "$_threat_category_summary", as: "cat", in: { k: "$$cat.c", v: { $toString: "$$cat.conf" } } } } } } }, + { $replaceWith: { $mergeObjects: ["$$ROOT", "$categories"] } }, + { $project: { + "_id": 1, +''' +for category in categories: + query += f'"{category}": {{ $ifNull: ["${category}", "0"] }},\n' +query += ''' +}}]).forEach(function(rec) { + var categories = Object.keys(rec).filter(key => key !== "_id"); + print( + int2ip(rec._id) + "," + + categories.map(key => rec[key]).join(",") + ); +}) +''' +subprocess.run(fstr(script), shell=True) + +######################################################################################################################## + +# blacklists +if args.verbose: + print("Generating blacklists") + +for category in categories: + out_file = f"{args.out_dir}/bl_{category}.txt" + header = "" + query = f''' + function int2ip (ipInt) {{ + return ( (ipInt>>>24) + "." + (ipInt>>16 & 255) + "." + (ipInt>>8 & 255) + "." + (ipInt & 255) ); + }} + db.ip.find({{"_threat_category_summary": {{$elemMatch: {{"c": "{category}", "conf": {{$gt: {args.conf_thr}}}}}}}, "tags.whitelist": {{$exists: false}}}}, {{_id: 1}}).sort({{"_threat_category_summary.conf": -1}}).forEach( function(rec) {{ print(int2ip(rec._id)); }} ); + ''' + subprocess.run(fstr(script), shell=True) + +if args.verbose: + print("Done") diff --git a/scripts/misp_updater.py b/scripts/misp_updater.py index fb0eec68..26a62812 100755 --- a/scripts/misp_updater.py +++ b/scripts/misp_updater.py @@ -20,6 +20,7 @@ from common.config import read_config import NERDd.core.mongodb as mongodb from common.task_queue import TaskQueueWriter +from common.threat_categorization import * DEFAULT_MONGO_HOST = 'localhost' DEFAULT_MONGO_PORT = 27017 @@ -64,6 +65,15 @@ logger.info("Loading config file {}".format(common_cfg_file)) config.update(read_config(common_cfg_file)) +# Read categorization config +categorization_cfg_file = os.path.join(config_base_path, 'threat_categorization.yml') +logger.info("Loading config file {}".format(categorization_cfg_file)) +config.update(read_config(categorization_cfg_file)) +categorization_config = { + "categories": config.get('threat_categorization'), + "malware_families": read_config(config.get('malpedia_family_list_path')) +} + inactive_ip_lifetime = config.get('record_life_length.misp', 180) db = mongodb.MongoEntityDatabase(config) @@ -330,6 +340,7 @@ def process_ip(ip_addr, ip_info): :return: None """ logger.debug("Processing IP: {}".format(ip_addr)) + update_requests = [] # check ip record in DB try: @@ -358,6 +369,9 @@ def process_ip(ip_addr, ip_info): dedup_event['role'] = "src and dst at the same time" event_ids_roles[index][1] = "src and dst at the same time" + # aggregated threat category classification records + threat_category = {} + # create all misp events and save the youngest datetime of the event for keep alive token events = [] youngest_date = datetime(year=2000, month=1, day=1, hour=0, minute=0, second=0) @@ -368,18 +382,56 @@ def process_ip(ip_addr, ip_info): if youngest_date < new_event['date']: youngest_date = new_event['date'] + attrib = {'type': '', 'value': '', 'comment': ''} # TODO get attrib info from misp + ip_role = event_info.get('role', "") + for category in classify_ip(ip_addr, "misp_receiver", logger, categorization_config, new_event, attrib, ip_role): + role = category['role'] + id = category['id'] + date = category['date'] + subcategories = category['subcategories'] + if role not in threat_category: + threat_category[role] = {} + if id not in threat_category[role]: + threat_category[role][id] = {} + if date not in threat_category[role][id]: + threat_category[role][id][date] = {} + if 'n_reports' not in threat_category[role][id][date]: + threat_category[role][id][date]['n_reports'] = 0 + if 'subcategories' not in threat_category[role][id][date]: + threat_category[role][id][date]['subcategories'] = {} + for subcategory in subcategories: + old = threat_category[role][id][date]['subcategories'].get(subcategory, []) + new = subcategories[subcategory] + threat_category[role][id][date]['subcategories'][subcategory] = list(set(old + new)) + threat_category[role][id][date]['n_reports'] += 1 + + # threat category updates + for role in threat_category: + for id in threat_category[role]: + for date in threat_category[role][id]: + n_reports = threat_category[role][id][date]['n_reports'] + subcategory_updates = [] + for subcategory, values in threat_category[role][id][date]['subcategories'].items(): + subcategory_updates.append(('extend_set', subcategory, values)) + update_requests += [( + 'array_upsert', + '_threat_category', + {'d': date, 'c': id}, + [('add', 'src.misp', n_reports), *subcategory_updates] + )] + if events: live_till = youngest_date + timedelta(days=inactive_ip_lifetime) if db_entity is not None: # compare 'misp_events' attrib from NERD with events list, if not same --> insert, else do not insert if db_entity.get('misp_events', {}) != events: # construct new update request and send it - update_requests = [('set', 'misp_events', events), ('set', '_ttl.misp', live_till), + update_requests += [('set', 'misp_events', events), ('set', '_ttl.misp', live_till), ('setmax', 'last_activity', youngest_date)] tq.put_task('ip', ip_addr, update_requests, "misp_updater") else: # ip address not even in NERD --> insert it - update_requests = [('set', 'misp_events', events), ('set', '_ttl.misp', live_till), ('setmax', + update_requests += [('set', 'misp_events', events), ('set', '_ttl.misp', live_till), ('setmax', 'last_activity', youngest_date)] tq.put_task('ip', ip_addr, update_requests, "misp_updater")