elastic

# -*- coding: utf-8 -*- import requests import json from copy import deepcopy from django.db.models.signals import post_save from django.dispatch import receiver from django.template.defaultfilters import striptags from mainapp.helper import * from vacancy.models import Vacancy from mainapp.models import Company from resume.models import Resume ES_SETTINGS = { 'LOCAL': 'http://127.0.0.1', 'PORT': 9200, 'SERVER_URL': 'http://127.0.0.1:9200', 'INDEX': 'rabix_index', 'PAGINATION': 10, 'SHARDS': 1, 'REPL': 0, 'MIN_GRAM': 3, 'MAX_GRAM': 15, 'max_result_window': 100000, } HEADERS = {"Content-Type": "application/json"} # Indexation scheme - necessary classes and their fields that we want to search in ES_TEMPLATE = { 'vacancy': { 'class': Vacancy, # 'title': 'position', 'properties': { 'position': { 'type': 'text', "analyzer": "russian" }, 'description': { 'type': 'text', "analyzer": "russian" }, 'salary_from': {'type': 'integer'}, 'salary_to': {'type': 'integer'}, 'employment_type': {'type': 'keyword'}, 'education': {'type': 'keyword'}, 'experience': {'type': 'keyword'}, 'city_list': {'type': 'keyword'}, 'field_list': {'type': 'keyword'}, 'driver_list': {'type': 'keyword'}, 'lang_list': {'type': 'keyword'}, 'duties': {'type': 'text', "analyzer": "russian"}, 'requirements': {'type': 'text', "analyzer": "russian"}, 'conditions': {'type': 'text', "analyzer": "russian"}, 'is_premium': {'type': 'boolean'}, 'company__company_type': {'type': 'keyword'}, 'company_title_prop': {'type': 'keyword'}, 'company__title': {'type': 'text', "analyzer": "russian"}, 'subway_list': {'type': 'keyword'}, 'created_at': { 'type': 'date', "format": "yyyy-MM-dd HH:mm:ss"}, 'updated_at': { 'type': 'date', "format": "yyyy-MM-dd HH:mm:ss"}, 'slug': {'type': 'text'}, 'moderated': {'type': 'boolean'} } }, 'company': { 'class': Company, # 'title': 'title', 'properties': { 'title': { 'type': 'text', "analyzer": "russian" }, 'description': { 'type': 'text', "analyzer": "russian" }, 'company_type': {'type': 'keyword'}, 'moderated': {'type': 'boolean'} } }, 'resume': { 'class': Resume, # 'title': 'position', 'properties': { 'position': { 'type': 'text', "analyzer": "russian" }, 'skills': { 'type': 'text', "analyzer": "russian" }, 'edu_txt': { "type": "text", "analyzer": "russian" }, 'exp_txt': { "type": "text", "analyzer": "russian" }, 'employment_type': {'type': 'keyword'}, 'city__title': {'type': 'keyword'}, 'user__employee__gender': {'type': 'keyword'}, 'user__employee__get_years': {'type': 'integer'}, 'is_premium': {'type': 'boolean'}, 'created_at': { 'type': 'date', "format": "yyyy-MM-dd HH:mm:ss"}, 'updated_at': { 'type': 'date', "format": "yyyy-MM-dd HH:mm:ss"}, 'salary_from': {'type': 'integer'}, 'field_list': {'type': 'keyword'}, 'exp_full_years': {'type': 'integer'}, 'education_list': {'type': 'keyword'}, 'driver_list': {'type': 'keyword'}, 'lang_list': {'type': 'keyword'}, 'relationship_status': {'type': 'keyword'}, 'nationality__title': {'type': 'keyword'}, 'relocation': {'type': 'boolean'}, 'children': {'type': 'boolean'}, 'has_image': {'type': 'boolean'}, 'moderated': {'type': 'boolean'} } }, } # Create the whole project index (ie make an indexation object in elastic's base) def build_index(): # Create the model-property scheme mappings = deepcopy(ES_TEMPLATE) # allfields = { # "_all": { # "analyzer": "nGram_analyzer", # "store": True # } # } for k, v in mappings.iteritems(): del v["class"] data = { "settings": { "number_of_shards": ES_SETTINGS['SHARDS'], "number_of_replicas": ES_SETTINGS['REPL'], "max_result_window": ES_SETTINGS['max_result_window'], "analysis": { "filter": { "russian_stop": { "type": "stop", "stopwords": "_russian_" }, "russian_keywords": { "type": "keyword_marker", "keywords": [] }, "russian_stemmer": { "type": "stemmer", "language": "russian" } }, "analyzer": { "russian": { "tokenizer": "standard", "filter": [ "lowercase", "russian_stop", "russian_keywords", "russian_stemmer" ] } } } }, "mappings": {k: v} } json_data = json.dumps(data) response = requests.put( ES_SETTINGS['SERVER_URL'] + '/' + k + '/', json=data ) print('\nBuild output:\n"%s"' % response.text) # Perform the indexation itself for given objects of model def fill_index(): # Iterate through all models that meant to be indexed print '\nFill output:' for i, name in enumerate(ES_TEMPLATE): model = ES_TEMPLATE[name] size = model['class'].objects.count() limit = 5000 ranged = 0 iteration_amount = size // limit if size % limit > 0: iteration_amount += 1 for x in range(0, iteration_amount): # Get objects elements = model['class'].objects.all()[ranged:ranged+limit] ranged += limit model_data = '' # Iterate through all objects of that class and create bulk data for element in elements: element_data = '{"index": {"_id": "%s"}}\n' % element.pk field_data = {} for field in model['properties']: # avoid breaking in case of changed field try: value = get_field(element, field) if field in ['created_at', 'updated_at']: field_data[field] = value.strftime("%Y-%m-%d %H:%M:%S") elif field in ['description', 'duties']: field_data[field] = striptags(value) # striptags removes HTML tags else: field_data[field] = value except Exception as e: field_data[field] = None log_stacktrace(e) print(e) element_data += json.dumps(field_data) + '\n' model_data += element_data try: response = requests.put( ES_SETTINGS['SERVER_URL'] + '/' + name + '/' + name + '/' + '_bulk', data=model_data, headers=HEADERS ) answer = json.loads(response.text) model_data = '' # print (answer) print('Model "%s", errors: %s, count: %s ' % ( name, answer['errors'], len(answer['items']) )) except Exception as e: model_data = '' print('ERROR: %s' % e) print("%s out of %s" % (ranged, size)) def get_field(instance, field): field_path = field.split('__') attr = instance for elem in field_path: try: attr = getattr(attr, elem) except AttributeError: return None return attr def search_similar(instance, model): query = {} if model == 'vacancy': query = { "bool": { "must": [ {"multi_match": { "query": instance.position, "fields": ["position^2", "description"] }}, {"terms": { "city_list": instance.city_list }} ], "must_not": { "term": { "_id": instance.pk } } } } elif model == 'resume': query = { "bool": { "must": [ {"match": { "position": instance.position }}, {"term": { "city__title": instance.city.title }} ], "must_not": { "term": { "_id": instance.pk } } } } data = { "query": query, "size": 6, "from": 0, "sort": { "_score": { "order": "desc" } } } found_array = [] try: response = requests.post( ES_SETTINGS['SERVER_URL'] + '/' + model + '/' + model + '/_search', data=json.dumps(data), headers=HEADERS ) parsed_response = json.loads(response.text) hits = parsed_response['hits'] # Custom serialization if hits['total'] > 0: results = hits['hits'] for result in results: element_model = result['_type'] element_id = result['_id'] try: element_class = ES_TEMPLATE[element_model]['class'] element = element_class.objects.get(pk=element_id) found_array.append(element) except Exception as e: log_stacktrace(e) found_array.append({ "model": element_model, "id": element_id, "position": None, "description": None, }) except Exception as e: log_stacktrace(e) print(e) return found_array def search(search_string, request, model="", field=None, only_count=False): debug_msg = "" query_page = request.GET.get('page', None) sort_field = request.GET.get('sort', '_score.desc') find_type = request.GET.get('find_type', "") search_fields = { "all": [ "title", "position", "edu_txt", "exp_txt", "description", "duties", "requirements", "conditions", "company__title" ], "name": ["title", "position"], "name.edu": ["title", "edu_txt"], "name.exp": ["title", "exp_txt"], "name.edu.exp": ["title", "edu_txt", "exp_txt"], "edu": ["edu_txt"], "edu.exp": ["edu_txt", "exp_txt"], "exp": ["exp_txt"], "name.desc": ["position", "description", "duties", "requirements", "conditions"], "name.comp": ["position", "company__title"], "name.desc.comp": [ "position", "description", "duties", "requirements", "conditions", "company__title" ], "desc.comp": [ "description", "duties", "requirements", "conditions", "company__title" ], "desc": ["description", "duties", "requirements", "conditions"], "comp": ["company__title"], } if find_type in search_fields: search_fields = search_fields[find_type] else: search_fields = search_fields["all"] if query_page is None or int(query_page) < 1: query_page = 1 size = ES_SETTINGS['PAGINATION'] beginning = (int(query_page) - 1) * int(size) langs = request.GET.getlist('lang') levels = request.GET.getlist('langlevel') lang_query = [] if len(langs) > 0: lang_query = ['%s-%s' % (a, b) for a, b in zip(langs, levels)] if len(lang_query) > 0: if lang_query[0] == '-': lang_query = None debug_msg = "TEST" employment_types = request.GET.get('employment_type', None) default_query = { "bool": { "must": [ {"multi_match": { "query": search_string, # "fields": ["title^2", "position^2", "description"] "fields": search_fields }}, {"terms": { "city_list": request.GET.getlist('city', None) }}, {"terms": { "city__title": request.GET.getlist('city_res', None) }}, {"terms": { "field_list": request.GET.getlist('field', None) }}, {"terms": { "field_list": field }}, {"terms": { "lang_list": lang_query }}, {"terms": { "driver_list": request.GET.getlist('driver', None) }}, {"term": { "employment_type": employment_types }}, {"terms": { "subway_list": request.GET.getlist('subway', None) }}, {"terms": { "education_list": request.GET.getlist('education_list', None) }}, {"term": { "experience": request.GET.get('experience', None) }}, {"term": { "company__company_type": request.GET.get('company_type', None) }}, {"term": { "company_type": request.GET.get('_company_type', None) }}, {"term": { "is_premium": request.GET.get('is_premium', None) }}, {"range": { "salary_from": { "gte": request.GET.get('salary_from', None), "lte": request.GET.get('salary_to', None) } }}, {"range": { "user__employee__get_years": { "gte": request.GET.get('age_from', None), "lte": request.GET.get('age_to', None) } }}, {"range": { "exp_full_years": { "gte": request.GET.get('exp_full_years', None) } }}, {"term": { "user__employee__gender": request.GET.get('gender', None) }}, {"term": { "nationality__title": request.GET.get('citizenship', None) }}, {"term": { "relocation": request.GET.get('trip', None) }}, {"term": { "children": request.GET.get('children', None) }}, {"term": { "has_image": request.GET.get('has_image', None) }}, {"term": { "relationship_status": request.GET.get('married', None) }}, {"term": { "moderated": True }}, ] } } #query adjustments salary_exists = request.GET.get('salary_exists', None) if salary_exists: default_query['bool']['must'].append( {"range": { "salary_from": { "gte": 1, } }}) if employment_types: if ',' in employment_types: employment_types = employment_types.split(',') del default_query['bool']['must'][7] default_query['bool']['must'].append( {"terms": { "employment_type": employment_types }}, ) if not search_string: del default_query['bool']['must'][0]['multi_match'] query = clean_empty(default_query) if not query: query = { "match_all": {} } if only_count: data = { "query": query, } response = requests.post( ES_SETTINGS['SERVER_URL'] + '/' + model + '/' + model + '/_count', data=json.dumps(data), headers=HEADERS ) parsed_response = json.loads(response.text) return parsed_response data = { # "min_score": 5, "query": query, "size": size, "from": beginning, "aggs": { "avg_salary": {"avg": {"field": "salary_from"}} }, } print(query) if sort_field: sort = '?sort=%s:%s' % ( sort_field.split('.')[0], sort_field.split('.')[1] ) response = requests.post( ES_SETTINGS['SERVER_URL'] + '/' + model + '/' + model + '/_search' + sort, data=json.dumps(data), headers=HEADERS ) debug_msg = 'srch' resulting_response = { 'count': 0, 'pages': 0, 'avg_salary': 0, 'found': [] } found_array = [] parsed_response = json.loads(response.text) hits = parsed_response['hits'] # Custom serialization if hits['total'] > 0: results = hits['hits'] for result in results: element_model = result['_type'] element_id = result['_id'] try: element_class = ES_TEMPLATE[element_model]['class'] element = element_class.objects.get(pk=element_id) element.score = result['_score'] found_array.append(element) except Exception as e: log_stacktrace(e) # print(e) # raise e found_array.append({ "model": element_model, "id": element_id, "reference": None, "title": None, "description": None, }) # results ending divide_integer = (int(hits['total']) // int(size)) divide_partly = (1 if ((int(hits['total']) % int(size)) > 0) else 0) maximum = divide_integer + divide_partly avg_salary = parsed_response['aggregations']['avg_salary']['value'] try: avg_salary = int(avg_salary) except TypeError: pass resulting_response['count'] = hits['total'] resulting_response['max_score'] = hits['max_score'] resulting_response['avg_salary'] = avg_salary resulting_response['pages'] = maximum resulting_response['found'] = found_array # hits ending resulting_response['query'] = query resulting_response['debug_msg'] = debug_msg print(resulting_response) return resulting_response def delete_index(): for k, v in ES_TEMPLATE.iteritems(): response = requests.delete( ES_SETTINGS['SERVER_URL'] + '/' + k ) answer = response.text print '\nDeletion output:\n"%s"' % answer return answer def save_instance(instance, name): model = ES_TEMPLATE[name] field_data = {} for field in model['properties']: try: value = get_field(instance, field) if field in ['created_at', 'updated_at']: field_data[field] = value.strftime("%Y-%m-%d %H:%M:%S") elif field in ['description', 'duties']: field_data[field] = striptags(value) # striptags removes HTML tags else: field_data[field] = value except Exception as e: field_data[field] = None log_stacktrace(e) print(e) try: response = requests.put( ES_SETTINGS['SERVER_URL'] + '/' + name + '/' + name + '/' + str(instance.pk), data=json.dumps(field_data), headers=HEADERS ) answer = json.loads(response.text) # print(answer) except Exception as e: log_stacktrace(e) print(e) @receiver(post_save, sender=Resume) def resume_handler(sender, instance, **kwargs): name = 'resume' if instance.moderated == True and instance.moderation_status == 'Active': save_instance(instance, name) @receiver(post_save, sender=Vacancy) def vacancy_handler(sender, instance, **kwargs): name = 'vacancy' if instance.moderated == True and instance.status == 'Active': save_instance(instance, name) @receiver(post_save, sender=Company) def company_handler(sender, instance, **kwargs): name = 'company' if instance.moderated == True and instance.status == 'Active': save_instance(instance, name)

Be the first to comment

You can use [html][/html], [css][/css], [php][/php] and more to embed the code. Urls are automatically hyperlinked. Line breaks and paragraphs are automatically generated.