Commit 82941106 authored by pushkar191098's avatar pushkar191098
Browse files

initial commit

1 merge request!1addition: main_server_code, scripts, docs
Showing with 743 additions and 0 deletions
+743 -0
BASIC_HTTP_USERNAME=test
BASIC_HTTP_PASSWORD=generic@123#
ELASTIC_DB_URL=https://localhost:9200
BLOB_SAS_TOKEN=XXXXX
BLOB_ACCOUNT_URL=YYYYY
BLOB_CONTAINER_NAME=ZZZZZ
MAX_RUNNING_JOBS=4
**__pycache__
*.vscode
*.log
/env
exp_result.py
**.DS_Store
/upload/*
#
\ No newline at end of file
FROM mycrawlercontainerregistry.azurecr.io/general-1-crawlerbase:latest
COPY / /app
WORKDIR /app
RUN pip3 install -r requirements.txt
COPY start.sh /usr/bin/start.sh
RUN chmod +x /usr/bin/start.sh
ENTRYPOINT ["/bin/bash","/usr/bin/start.sh"]
#FROM python:3.6-slim-stretch
#COPY / /app
#WORKDIR /app
#RUN apt update
#RUN pip3 install -r requirements.txt
#COPY start.sh /usr/bin/start.sh
#RUN chmod +x /usr/bin/start.sh
#CMD ["/usr/bin/start.sh"]
src/app.py 0 → 100644
import json
import os
import sys
from flask import Flask
from flask.blueprints import Blueprint
from flask_basicauth import BasicAuth
from flask_cors import CORS
# local imports
import config
import routes
from models import AgentUtils
# flask server
server = Flask(__name__)
# server configuration
config.SERVER_STATIC_PATH = server.static_folder
server.config['BASIC_AUTH_USERNAME'] = config.BASIC_HTTP_USERNAME
server.config['BASIC_AUTH_PASSWORD'] = config.BASIC_HTTP_PASSWORD
# basic_auth for server
basic_auth = BasicAuth(server)
# load agents config
with open(os.path.join(config.SERVER_STATIC_PATH, config.AGENT_CONFIG_PATH), 'r') as f:
agent_list = json.load(f)
__import__("scripts")
my_scripts = sys.modules["scripts"]
agentUtils = AgentUtils()
agentUtils.filepath = os.path.join(
config.SERVER_STATIC_PATH, config.AGENT_CONFIG_PKL_PATH)
pkl_agent_list = agentUtils.listAgents()
len_diff = len(agent_list) - len(pkl_agent_list)
for i in range(len(agent_list)-1, len(agent_list)-len_diff-1, -1):
agent = agent_list[i]
agent_script = dict()
for type in config.AGENT_SCRIPT_TYPES.values():
agent_script[type] = my_scripts.__dict__[
type].__dict__[agent['scripts'][type]]
agentUtils.addAgent(agent['agentId'],
agent['description'],
agent['provider'],
agent_script,
agent['URL'])
# server CORS policy
if config.SERVER_CORS:
cors = CORS(server, resources={r"/api/*": {"origins": "*"}})
# add blueprint routes to server
for blueprint in vars(routes).values():
if isinstance(blueprint, Blueprint):
server.register_blueprint(blueprint, url_prefix=config.API_URL_PREFIX)
# sample route
@server.route('/')
def home():
return "<h1>HI</h1>"
# start server
if __name__ == "__main__":
print('starting server at {} at port {}'.format(
config.SERVER_HOST, config.SERVER_PORT))
server.run(host=config.SERVER_HOST,
port=config.SERVER_PORT,
debug=config.SERVER_DEBUG,
threaded=True)
from .scraping_utils import get_driver
from .elastic_wrapper import Log
from .errors import ValueMissing, FormatError, BadRequestError
from .blob_storage import BlobStorage
import os
import config
from azure.storage.blob import BlobServiceClient
class BlobStorage(object):
def __init__(self):
self.blob_service_client = BlobServiceClient(account_url=config.BLOB_ACCOUNT_URL, credential=config.BLOB_SAS_TOKEN)
self.root_folder = None
@property
def root_folder(self):
return self._root_folder
@root_folder.setter
def root_folder(self,rf):
self._root_folder = rf
@property
def blob_service_client(self):
return self._blob_service_client
@blob_service_client.setter
def blob_service_client(self,bsc):
self._blob_service_client = bsc
def set_agent_folder(self,agent_id):
self.root_folder = agent_id
def upload_file(self,file_name,file_contents):
upload_file_path = os.path.join(self.root_folder,file_name)
blob_client = self.blob_service_client.get_blob_client(container=config.BLOB_CONTAINER_NAME,blob=upload_file_path)
try:
blob_client.upload_blob(file_contents)
except Exception as e:
return False,str(e)
return True,'true'
import config
from elasticsearch import Elasticsearch
import json
import time
class Log(object):
@classmethod
def from_default(cls):
return cls(None)
def __init__(self, agentRunContext):
self.agentRunContext = agentRunContext
self.es_client = Elasticsearch([config.ELASTIC_DB_URL])
def __populate_context(self):
data = {
'agentId': self.agentRunContext.requestBody['agentId'],
'jobId': self.agentRunContext.jobId,
'jobType': self.agentRunContext.jobType,
'timestamp': int(time.time()*1000),
'buildNumber': config.BUILD_NUMBER
}
return data
def __index_data_to_es(self, index, data):
if self.es_client.ping():
self.es_client.index(index=index, body=json.dumps(data))
else:
with open('logger.txt', 'a+') as f:
f.write(json.dumps(data)+'\n')
def info(self, info_type, message):
info_data = self.__populate_context()
info_data['type'] = info_type
info_data['message'] = message
self.__index_data_to_es(config.ES_LOG_INDEX, info_data)
def data(self, data):
data.update(self.__populate_context())
self.__index_data_to_es(config.ES_DATA_INDEX, data)
def job(self, status, message):
job_data = self.__populate_context()
job_data['status'] = status
job_data['message'] = message
self.__index_data_to_es(config.ES_JOB_INDEX, job_data)
def get_status(self, jobId):
print(jobId)
if not self.es_client.ping():
return {'status': 'ES_CONNECTION_FAILED', 'message': "Not able to connect to ES DB"}
else:
search_param = {
"sort": [
{
"timestamp": {
"order": "desc"
}
}
],
"query": {
"bool": {
"must": [
{"match": {
"jobId.keyword": jobId
}}
]
}
}
}
res = self.es_client.search(
index=config.ES_JOB_INDEX, body=search_param)
if len(res['hits']['hits']) > 0:
source = res['hits']['hits'][0]['_source']
return {'status': source['status'], 'message': source['message']}
else:
return {'status': 'JOBID_NOT_FOUND', 'message': "Please check the given jobId"}
from flask import jsonify
class RestAPIError(Exception):
def __init__(self, status_code=500, payload=None):
self.status_code = status_code
self.payload = payload
def to_response(self):
return jsonify({'error': self.payload}), self.status_code
class BadRequestError(RestAPIError):
def __init__(self, payload=None):
super().__init__(400, payload)
class InternalServerErrorError(RestAPIError):
def __init__(self, payload=None):
super().__init__(500, payload)
class FormatError(Exception):
def __init__(self, code, message):
self._code = code
self._message = message
@property
def code(self):
return self._code
@property
def message(self):
return self._message
def __str__(self):
return self.__class__.__name__ + ': ' + self.message
class WorkflowkeyError(Exception):
def __init__(self, code, message):
self._code = code
self._message = message
@property
def code(self):
return self._code
@property
def message(self):
return self._message
def __str__(self):
return self.__class__.__name__ + ': ' + self.message
class FileErrors(Exception):
def __init__(self, code, message):
self._code = code
self._message = message
@property
def code(self):
return self._code
@property
def message(self):
return self._message
def __repr__(self):
return {"code": self.code, "message": self.__class__.__name__ + ': ' + self.message}
class FileEncodingError(Exception):
def __init__(self, code, message):
self._code = code
self._message = message
@property
def code(self):
return self._code
@property
def message(self):
return self._message
def __str__(self):
return self.__class__.__name__ + ': ' + self.message
class ServiceError(Exception):
def __init__(self, code, message):
self._code = code
self._message = message
@property
def code(self):
return self._code
@property
def message(self):
return self._message
def __str__(self):
return self.__class__.__name__ + ': ' + self.message
class ValueMissing(Exception):
def __init__(self, message):
self.message = message
@property
def message(self):
return self._message
@message.setter
def message(self, value):
self._message = value
def __str__(self):
return self.message
def __repr__(self):
return self.message
import os
from pathlib import Path
import config
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
chrome_path = Service(config.CHROMEDRIVER_PATH)
def enable_download_headless(browser, download_dir):
browser.command_executor._commands["send_command"] = (
"POST", '/session/$sessionId/chromium/send_command')
params = {'cmd': 'Page.setDownloadBehavior', 'params': {
'behavior': 'allow', 'downloadPath': download_dir}}
browser.execute("send_command", params)
def get_driver(temp_directory):
Path(temp_directory).mkdir(parents=True, exist_ok=True)
download_dir = os.path.join(temp_directory)
chrome_options = Options()
d = DesiredCapabilities.CHROME
d['goog:loggingPrefs'] = {'browser': 'ALL'}
chrome_options.add_argument("--headless")
chrome_options.add_argument("--window-size=1920x1080")
chrome_options.add_argument("--disable-notifications")
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--verbose')
chrome_options.add_argument('--log-level=3')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.page_load_strategy = 'normal'
chrome_options.add_argument(
'--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.190 Safari/537.36')
chrome_options.add_argument('--disable-software-rasterizer')
chrome_options.add_experimental_option("prefs", {
"download.default_directory": str(download_dir),
"download.prompt_for_download": False,
"download.directory_upgrade": True,
"safebrowsing_for_trusted_sources_enabled": False,
"safebrowsing.enabled": False,
"plugins.always_open_pdf_externally": True
})
driver = webdriver.Chrome(
service=chrome_path, options=chrome_options, desired_capabilities=d)
enable_download_headless(driver, download_dir)
return driver
src/config.py 0 → 100644
import os
# ------------------server configuration--------------------------
SERVER_HOST = '0.0.0.0'
SERVER_PORT = 5001
SERVER_DEBUG = True
SERVER_CORS = False
SERVER_STATIC_PATH = ''
# API configuration
API_URL_PREFIX = "/general"
BUILD_NUMBER = 'BUILD_NUMBER_001'
API_MANDATORY_PARAMS = ['agentId', 'search', 'type']
# Application configuration
BASIC_HTTP_USERNAME = os.environ.get('BASIC_HTTP_USERNAME')
BASIC_HTTP_PASSWORD = os.environ.get('BASIC_HTTP_PASSWORD')
# ------------------agent configuration---------------------------
# AGENT_SCRIPT_TYPES = { 'JOB_TYPE_1' : 'JOB_TYPE_1_FOLDER', 'JOB_TYPE_2' : 'JOB_TYPE_2_FOLDER' }
AGENT_SCRIPT_TYPES = {
'INFORMATION': 'info',
'PDF': 'pdf'
}
# agent configuration file
AGENT_CONFIG_PATH = 'agent_configs/agents.json'
AGENT_CONFIG_PKL_PATH = 'agent_configs/agents.pkl'
# ------------------AzureBlob Variable----------------------------
# AzureBlob variable
BLOB_INTIGRATION = False
BLOB_SAS_TOKEN = os.environ.get('BLOB_SAS_TOKEN')
BLOB_ACCOUNT_URL = os.environ.get('BLOB_ACCOUNT_URL')
BLOB_CONTAINER_NAME = os.environ.get('CONTAINER_NAME')
# ------------------Queuing variables-----------------------------
# Queuing variables
MAX_RUNNING_JOBS = int(os.environ.get('MAX_RUNNING_JOBS', 4))
MAX_WAITING_JOBS = int(os.environ.get('MAX_WAITING_JOBS', 10))
# ------------------ElasticSearch DB variables--------------------
ELASTIC_DB_URL = os.environ.get('ELASTIC_DB_URL')
# ES index variables
ES_LOG_INDEX = 'general-app-logs'
ES_JOB_INDEX = 'general-job-stats'
ES_DATA_INDEX = 'general-crawled-data'
# ------------------Logging variables-----------------------------
JOB_OUTPUT_PATH = "output"
# JobStatus variables
JOB_RUNNING_STATUS = 'RUNNING'
JOB_COMPLETED_SUCCESS_STATUS = 'COMPLETED_SUCCESS'
JOB_COMPLETED_FAILED_STATUS = 'COMPLETED_FAILED'
# ------------------Driver Variables-------------------------------
CHROMEDRIVER_PATH = 'C:\\Drivers\\chromedriver_win32\\chromedriver.exe'
# -----------------------------------------------------------------
from .response import CustomResponse
from .status import Status
from .job import JobModel
from .agent_utils import AgentUtils
class Agent(object):
def __init__(self, agentId, description, provider, scripts, URL):
self.provider = provider
self.description = description
self.agentId = agentId
self.scripts = scripts
self.URL = URL
@property
def agentId(self):
return self._agentId
@agentId.setter
def agentId(self, value):
self._agentId = value
@property
def description(self):
return self._description
@description.setter
def description(self, value):
self._description = value
@property
def provider(self):
return self._provider
@provider.setter
def provider(self, value):
self._provider = value
@property
def scripts(self):
return self._scripts
@scripts.setter
def scripts(self, value):
self._scripts = value
@property
def URL(self):
return self._URL
@URL.setter
def URL(self, value):
self._URL = value
def __str__(self):
str_1 = 'id: {0} , description: {1} , provider: {2} , scripts: {3} , URL: {4}'
str_1 = str_1.format(self.agentId, self.description,
self.provider, self.scripts, self.URL)
return str_1
import os
import pickle
from .agent_class import Agent
class AgentUtils:
def __init__(self):
self.filepath = None
@property
def filepath(self):
return self._filepath
@filepath.setter
def filepath(self, value):
self._filepath = value
def __readPklFile(self):
if os.path.exists(self.filepath):
file_pi = open(self.filepath, 'rb')
agent_list = pickle.load(file_pi)
return agent_list
else:
return []
def __writePklFile(self, agent_list):
file_pi = open(self.filepath, 'wb')
pickle.dump(agent_list, file_pi)
def addAgent(self, agentId, description, provider, scripts, URL):
agent = Agent(agentId, description, provider, scripts, URL)
agent_list = self.__readPklFile()
for old_agent in agent_list:
if old_agent.agentId == agent.agentId:
print('The agent already exists', agent)
return
agent_list.append(agent)
self.__writePklFile(agent_list)
def listAgents(self):
return_list = []
agent_list = self.__readPklFile()
for old_agent in agent_list:
agent = {}
agent['agentId'] = old_agent.agentId
agent['description'] = old_agent.description
agent['provider'] = old_agent.provider
agent['scripts'] = old_agent.scripts
agent['URL'] = old_agent.URL
return_list.append(agent)
return return_list
from common import Log
class JobModel(object):
def status(self, jobId):
'''
connect to ES DB and get the status of jobId
'''
log = Log.from_default()
return log.get_status(jobId)
from flask import jsonify
class CustomResponse:
def __init__(self, statuscode, data):
self.statuscode = statuscode
self.statuscode['data'] = data
def getres(self):
return jsonify(self.statuscode)
def getresjson(self):
return self.statuscode
import enum
class Status(enum.Enum):
SUCCESS = {
'ok': True,
'http': {'status': 200},
'why': "request successful"
}
FAILURE = {
'ok': False,
'http': {'status': 500},
'why': 'request failed'
}
ERR_SYSTEM = {
'ok': False,
'http': {'status': 500},
'why': "Internal Server Error"
}
ERR_INVALID_DATA = {
'ok': False,
'http': {'status': 400},
'why': "Invalid Data"
}
ERR_MISSING_PARAMETERS = {
'ok': False,
'http': {'status': 400},
'why': "Data Missing"
}
CORRUPT_FILE = {
'ok': False,
'http': {'status': 500},
'why': 'uploaded file is corrupt'
}
DATA_NOT_FOUND = {
'ok': False,
'http': {'status': 404},
'why': 'data not found'
}
OPERATION_NOT_PERMITTED = {
'ok': False,
'http': {'status': 400},
'why': 'operation not permitted'
}
ERR_GATEWAY = {
'ok': False,
'http': {'status': 400},
'why': 'gateway error'
}
ERR_NOTFOUND_FILE = {
'ok': False,
'http': {'status': 400},
'why': 'file not found'
}
ERR_SCHEMA_VALIDATION = {
'ok': False,
'http': {'status': 400},
'why': 'please refer api contract to check your request structure'
}
from .agent import AgentRepo
from .job import JobRepo
import threading
import time
import uuid
import config
from common.elastic_wrapper import Log
from models import AgentUtils
class AgentRepo:
def __init__(self):
self.agentUtils = AgentUtils()
self.activeThreads = []
self.waitThreads = []
def list(self, filepath):
self.agentUtils.filepath = filepath
result = self.agentUtils.listAgents()
for agent in result:
agent.pop('scripts')
return result
def waitAndStart(self, agentRunContext, target_script):
# log waiting state
log = Log(agentRunContext)
log.job(config.JOB_RUNNING_STATUS, "JOB in waiting state.")
del log
# code to check and run if activeThreads is empty
while True:
if len(self.activeThreads) < config.MAX_RUNNING_JOBS:
self.activeThreads.append(agentRunContext.jobId)
self.waitThreads.remove(agentRunContext.jobId)
thread = threading.Thread(target=target_script, args=(
agentRunContext,), name=agentRunContext.jobId)
thread.start()
# check if thread alive
while thread.is_alive():
time.sleep(10)
# remove thread after completion
self.activeThreads.remove(agentRunContext.jobId)
break
else:
time.sleep(10)
return None
def run(self, agentRunContext, filepath):
threadStarted = False
agentRunContext.jobId = str(uuid.uuid4())
self.agentUtils.filepath = filepath
agents_list = self.agentUtils.listAgents()
threadStarted = False
for agent in agents_list:
if agent['agentId'] == agentRunContext.requestBody['agentId']:
agentRunContext.URL = agent['URL']
threadStarted = True
if len(self.waitThreads) < config.MAX_WAITING_JOBS:
self.waitThreads.append(agentRunContext.jobId)
thread = threading.Thread(target=self.waitAndStart, args=(
agentRunContext, agent['scripts'][config.AGENT_SCRIPT_TYPES[agentRunContext.jobType]]), name=str('wait-'+agentRunContext.jobId))
thread.start()
else:
return {'message': 'Already many jobs are in Waiting ... Please retry after some time.'}
if threadStarted:
return {'jobId': agentRunContext.jobId}
else:
return None
from models import JobModel
class JobRepo:
def __init__(self):
self.jobModel = JobModel()
def status(self, jobId):
return self.jobModel.status(jobId)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment