Unverified Commit 0ea279e9 authored by Dhiraj Suthar's avatar Dhiraj Suthar Committed by GitHub
Browse files

Merge pull request #1 from dileep-gadiraju/develop

addition: main_server_code, scripts, docs
No related merge requests found
Showing with 1154 additions and 0 deletions
+1154 -0
import config
import scrapy
from common import Log
from scrapy.crawler import CrawlerRunner
from twisted.internet import reactor
def AppliedScrapy(agentRunContext):
log = Log(agentRunContext)
log.job(config.JOB_RUNNING_STATUS, 'Job Started')
class AppliedSpider(scrapy.Spider):
name = 'applied'
custom_settings = {
"LOG_ENABLED": False
}
user_agent = 'Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/44.0.2403.157 Safari/537.36'
def __init__(self, search_param=''):
self.api_url = 'https://www.applied.com'
self.start_urls = [
'https://www.applied.com/search?page=0&search-category=all&override=true&isLevelUp=false&q='+search_param]
super().__init__()
def parse(self, response):
# search url parsing
for scrape_url in response.xpath('//a[@class="hide-for-print more-detail"]/@href').extract():
# extract product url
yield scrapy.Request(self.api_url+scrape_url, self.collect_data)
# extract next page url and re-run function
next_page = response.xpath('//a[@class="next"]/@href').get()
if next_page is not None:
yield scrapy.Request(self.api_url+next_page, self.parse)
# product url parsing
def collect_data(self, response):
# specification data
spec = dict()
for trs in response.xpath('//*[@id="specifications"]//table//tr'):
key = trs.xpath('./td[1]/text()').get().strip()
value = trs.xpath('./td[2]/text()').get().strip()
spec[key] = value
# final data
data = {
'company': response.xpath('//h1[@itemprop="brand"]/a/text()').get().strip(),
'product': response.xpath('//span[@itemprop="mpn name"]/text()').get().strip(),
'details': response.xpath('//div[@class="details"]//text()').get().strip(),
'item': response.xpath('//div[@class="customer-part-number"]/text()').get().strip(),
'description': [x.strip() for x in response.xpath('//div[@class="short-description"]/ul/li/text()').extract()],
'specification': spec,
'url': response.url.strip(),
}
log.data(data)
runner = CrawlerRunner()
d = runner.crawl(
AppliedSpider, search_param=agentRunContext.requestBody.get('search'))
d.addBoth(lambda _: reactor.stop())
reactor.run()
log.job(config.JOB_COMPLETED_SUCCESS_STATUS,
'Successfully scraped all data')
import os
import time
import traceback
import config
from common import Log, get_driver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
def AppliedSelenium(agentRunContext):
log = Log(agentRunContext)
try:
url = 'https://www.applied.com/search?q=:relevance:FTS:' + \
agentRunContext.requestBody['search'] + \
'&page=<page>&search-category=all&override=true&isLevelUp=false'
download_dir_id = str(agentRunContext.jobId)
download_dir = os.path.join(
os.getcwd(), 'temp', 'temp-' + download_dir_id)
driver = get_driver(download_dir)
driver.maximize_window()
driver.get(url)
wait = WebDriverWait(driver, 20)
log.job(config.JOB_RUNNING_STATUS, 'Job Started')
try:
wait(EC.element_to_be_clickable(
(By.ID, "CybotCookiebotDialogBodyButtonAccept")))
driver.find_element_by_id(
"CybotCookiebotDialogBodyButtonAccept").click()
except:
pass
for page_no in range(1, 1000):
driver.get(url.replace('<page>', str(page_no)))
time.sleep(2)
if 'page' not in driver.current_url:
break
wait.until(EC.presence_of_element_located(
(By.CLASS_NAME, 'product-list')))
for item in driver.find_elements_by_xpath('//a[@itemprop="url"][.="View more details"]'):
href = item.get_attribute('href')
driver.switch_to.new_window()
driver.get(href)
time.sleep(2)
wait.until(EC.presence_of_element_located((By.TAG_NAME, 'h1')))
item_dict = {
'brand': driver.find_element_by_tag_name('h1').text.strip(),
'name': driver.find_element_by_xpath('//*[@itemprop= "mpn name"]').text.strip(),
'details': driver.find_element_by_class_name('details').text.strip(),
'item_no': driver.find_element_by_class_name('customer-part-number').text.strip(),
'company': driver.find_element_by_xpath('//h1[@itemprop="brand"]/a').text.strip(),
'product': driver.find_element_by_xpath('//span[@itemprop="mpn name"]').text.strip(),
'details': driver.find_element_by_xpath('//div[@class="details"]').text.strip(),
'item': driver.find_element_by_xpath('//div[@class="customer-part-number"]').text.strip()
}
item_dict['short_description'] = list()
des = driver.find_element_by_class_name('short-description')
for ele in des.find_elements_by_xpath('.//li'):
item_dict['short_description'].append(ele.text.strip())
item_dict['specification'] = dict()
spe = driver.find_element_by_id('specifications')
for table in spe.find_elements_by_xpath('.//table'):
for tr_ele in table.find_elements_by_xpath('./tbody/tr'):
key = str(tr_ele.find_element_by_xpath(
'./td[1]').text).strip()
value = str(tr_ele.find_element_by_xpath(
'./td[2]').text).strip()
item_dict['specification'][key] = value
print(item_dict['specification'])
try:
log.data(item_dict)
except:
pass
driver.close()
driver.switch_to.window(driver.window_handles[0])
log.job(config.JOB_COMPLETED_SUCCESS_STATUS,
'Successfully scraped all data')
except Exception as e:
log.job(config.JOB_COMPLETED_FAILED_STATUS, str(e))
log.info('exception', traceback.format_exc())
driver.quit()
import config
import scrapy
from common import Log
from scrapy.crawler import CrawlerRunner
from twisted.internet import reactor
# search_param=do630 voltage regulator (via category list)
# search_param=do 360 voltage (via product list)
# search_param=61HH68 (via direct product page)
null = 'null'
true = 'true'
false = 'false'
def GraingerScrapy(agentRunContext):
log = Log(agentRunContext)
class GraingerScrapy(scrapy.Spider):
name = 'GraingerScrapy'
user_agent = 'Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/44.0.2403.157 Safari/537.36'
main_url = 'https://www.grainger.com/'
def __init__(self, search_param):
self.start_urls = [
"https://www.grainger.com/search?searchQuery="+search_param]
super().__init__()
def parse(self, response):
if 'search?' not in response.url:
yield scrapy.Request(url=response.url, callback=self.collect_data)
else:
if len(response.css('section[aria-label="Category products"]')) > 0:
script = [i.strip() for i in response.css('script::text').extract(
) if i.strip().startswith('window.__PRELOADED_STATE__')][0]
script = eval(script.split(
'=', 1)[-1].split('window.__UI_CONFIG__')[0].strip()[:-1])
products = list(script['category']['category']
['skuToProductMap'].keys())
href = '/product/info?productArray='+','.join(products)
yield scrapy.Request(url=self.main_url+href, callback=self.get_products)
else:
# iterate every categories
for href in response.css('a.route::attr(href)').extract():
yield scrapy.Request(url=self.main_url+href, callback=self.parse_category_page)
def parse_category_page(self, response):
script = [i.strip() for i in response.css('script::text').extract(
) if i.strip().startswith('window.__PRELOADED_STATE__')][0]
script = eval(script.split('=', 1)
[-1].split('window.__UI_CONFIG__')[0].strip()[:-1])
cat_id = script['category']['category']['id']
for i in script['category']['collections']:
coll_id = i['id']
url1 = self.main_url + \
'/experience/pub/api/products/collection/{0}?categoryId={1}'
yield scrapy.Request(url=url1.format(coll_id, cat_id), callback=self.get_products)
def get_products(self, response):
data = response.json()
if 'products' in data.keys():
for i in data['products']:
yield scrapy.Request(url=self.main_url+i['productDetailUrl'], callback=self.collect_data)
else:
for i in data.values():
if type(i) == dict and 'productDetailUrl' in i.keys():
yield scrapy.Request(url=self.main_url+i['productDetailUrl'], callback=self.collect_data)
def collect_data(self, response):
data = dict()
main_content = response.css('.product-detail__content--large')
spec = response.css('.specifications')
data = {
'brand': main_content.css('.product-detail__brand--link::text').get().strip(),
'product-heading': main_content.css('.product-detail__heading::text').get().strip(),
'url': response.url
}
for li in main_content.css('.product-detail__product-identifiers-content'):
key = li.css(
'.product-detail__product-identifiers-label::text').get().strip()
value = li.css(
'.product-detail__product-identifiers-description::text').extract()
value = [str(i).strip() for i in value] if len(
value) > 1 else str(value[0]).strip()
data[key] = value
for li in spec.css('.specifications__item'):
key = li.css('.specifications__description::text').get()
value = li.css('.specifications__value::text').extract()
value = [str(i).strip() for i in value] if len(
value) > 1 else str(value[0]).strip()
data[key] = value
log.data(data)
log.job(config.JOB_RUNNING_STATUS, 'Job Started')
runner = CrawlerRunner()
d = runner.crawl(
GraingerScrapy, search_param=agentRunContext.requestBody.get('search'))
d.addBoth(lambda _: reactor.stop())
reactor.run()
log.job(config.JOB_COMPLETED_SUCCESS_STATUS,
'Successfully scraped all data')
\ No newline at end of file
import os
import time
import traceback
import config
from common import Log, get_driver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
def GraingerSelenium(agentRunContext):
log = Log(agentRunContext)
log.job(config.JOB_RUNNING_STATUS, 'Job Started')
log.job(config.JOB_RUNNING_STATUS, 'Script Under Development')
log.job(config.JOB_COMPLETED_SUCCESS_STATUS,
'Successfully scraped all data')
# Scrapy
from .applied_scrapy import AppliedScrapy
from .grainger_scrapy import GraingerScrapy
# Selenium
from .applied_selenium import AppliedSelenium
from .grainger_selenium import GraingerSelenium
\ No newline at end of file
import config
import scrapy
from common import Log
from scrapy.crawler import CrawlerRunner
from twisted.internet import reactor
def AppliedScrapy(agentRunContext):
log = Log(agentRunContext)
log.job(config.JOB_RUNNING_STATUS, 'Job Started')
log.job(config.JOB_RUNNING_STATUS, 'Script Under Development')
log.job(config.JOB_COMPLETED_SUCCESS_STATUS,
'Successfully scraped all data')
import os
import time
import traceback
import config
from common import Log, get_driver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
def AppliedSelenium(agentRunContext):
log = Log(agentRunContext)
log.job(config.JOB_RUNNING_STATUS, 'Job Started')
log.job(config.JOB_RUNNING_STATUS, 'Script Under Development')
log.job(config.JOB_COMPLETED_SUCCESS_STATUS,
'Successfully scraped all data')
import scrapy
from scrapy.pipelines.files import FilesPipeline
# search_param=do630 voltage regulator (via category list)
# search_param=do 360 voltage (via product list)
# search_param=61HH68 (via direct product page)
# variables for eval() to parse
null = 'null'
true = 'true'
false = 'false'
def GraingerScrapy(agentRunContext):
class GeneralFilesItem(scrapy.Item):
file_name = scrapy.Field()
file_urls = scrapy.Field()
files = scrapy.Field
class GenreralFilesPipeline(FilesPipeline):
def get_media_requests(self, item, info):
for my_url in item.get('file_urls', []):
yield scrapy.Request(my_url, meta={'file_name': item.get('file_name')})
def file_path(self, request, response=None, info=None):
return request.meta['file_name']
class GriengerPDFScrapy(scrapy.Spider):
name = 'GriengerPDFScrapy'
user_agent = 'Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/44.0.2403.157 Safari/537.36'
main_url = 'https://www.grainger.com/'
custom_settings = {
'ITEM_PIPELINES': {'grienger_scrapy_pdf.GenreralFilesPipeline': 1},
'FILES_STORE': '/home/test/Music/down/'
}
def __init__(self, agentRunContext):
self.start_urls = [
"https://www.grainger.com/search?searchQuery="+agentRunContext.requestBody['search']]
super().__init__()
self
def parse(self, response):
if 'search?' not in response.url:
yield scrapy.Request(url=response.url, callback=self.collect_data)
else:
if len(response.css('section[aria-label="Category products"]')) > 0:
script = [i.strip() for i in response.css('script::text').extract(
) if i.strip().startswith('window.__PRELOADED_STATE__')][0]
script = eval(script.split(
'=', 1)[-1].split('window.__UI_CONFIG__')[0].strip()[:-1])
products = list(script['category']['category']
['skuToProductMap'].keys())
href = '/product/info?productArray='+','.join(products)
yield scrapy.Request(url=self.main_url+href, callback=self.get_products)
else:
# iterate every categories
for href in response.css('a.route::attr(href)').extract():
yield scrapy.Request(url=self.main_url+href, callback=self.parse_category_page)
def parse_category_page(self, response):
script = [i.strip() for i in response.css('script::text').extract(
) if i.strip().startswith('window.__PRELOADED_STATE__')][0]
script = eval(script.split('=', 1)
[-1].split('window.__UI_CONFIG__')[0].strip()[:-1])
cat_id = script['category']['category']['id']
for i in script['category']['collections']:
coll_id = i['id']
url1 = self.main_url + \
'/experience/pub/api/products/collection/{0}?categoryId={1}'
yield scrapy.Request(url=url1.format(coll_id, cat_id), callback=self.get_products)
def get_products(self, response):
data = response.json()
if 'products' in data.keys():
for i in data['products']:
yield scrapy.Request(url=self.main_url+i['productDetailUrl'], callback=self.collect_data)
else:
for i in data.values():
if type(i) == dict and 'productDetailUrl' in i.keys():
yield scrapy.Request(url=self.main_url+i['productDetailUrl'], callback=self.collect_data)
def collect_data(self, response):
data = dict()
main_content = response.css('.product-detail__content--large')
for li in main_content.css('.product-detail__product-identifiers-content'):
key = li.css(
'.product-detail__product-identifiers-label::text').get().strip()
value = li.css(
'.product-detail__product-identifiers-description::text').extract()
value = [str(i).strip() for i in value] if len(
value) > 1 else str(value[0]).strip()
data[key] = value
for a_tag in response.css('a.documentation__link'):
a_href = a_tag.xpath('./@href').get()
a_name = a_tag.xpath('./@title').get().strip()
filename = data['Item #']+'-'+a_name+'.'+a_href.split('.')[-1]
item = GeneralFilesItem()
item['file_name'] = filename
item['file_urls'] = ['https:'+a_href]
yield item
import os
import shutil
import time
import traceback
import config
from common import Log, get_driver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
def single_product(log, driver, download_dir, new_output_dir, win_handle=2):
try:
doc_section = driver.find_elements(
By.XPATH, '//ul[@class="documentation__content"]//li')
for link in doc_section:
download_link = link.find_element_by_tag_name(
'a').get_attribute('href')
product_name = str(driver.current_url).split('-')[-1].strip()
try:
product_name = product_name.split('-')[-1].split('?')[:1][0]
except:
pass
driver.switch_to.new_window()
driver.get(download_link)
time.sleep(5)
file_name = os.listdir(download_dir)[0]
new_file_name = product_name + "-" + file_name
os.rename(os.path.join(download_dir, file_name),
os.path.join(download_dir, new_file_name))
shutil.move(os.path.join(download_dir, new_file_name),
os.path.join(new_output_dir, new_file_name))
log.info('info', '{0} Downloaded'.format(new_file_name))
time.sleep(2)
driver.close()
driver.switch_to.window(driver.window_handles[win_handle])
except Exception as e:
log.info('exception', traceback.format_exc())
def multi_product(log, wait, driver, download_dir, new_output_dir):
# Collecting details for all products available
wait.until(EC.visibility_of_element_located(
(By.XPATH, '//div[@class = "multi-tiered-category"]')))
all_product = driver.find_elements_by_xpath(
'//div[@class = "multi-tiered-category"]//ul//li/a')
all_product = [i.get_attribute('href') for i in all_product]
c_url = driver.current_url
for p_url in all_product:
driver.switch_to.new_window()
driver.get(p_url)
time.sleep(2)
try:
wait.until(EC.element_to_be_clickable(
(By.XPATH, '//div[@id="feedbackBrowseModal"]//div[@class="modal-footer"]//a[@class = "close"]')))
driver.find_element_by_xpath(
'//div[@id="feedbackBrowseModal"]//div[@class="modal-footer"]//a[@class = "close"]').click()
time.sleep(2)
except:
pass
for a_tag in driver.find_elements(By.XPATH, "//tbody//a"):
product_url = str(a_tag.get_attribute('href'))
driver.switch_to.new_window()
driver.get(product_url)
time.sleep(2)
single_product(log, driver, download_dir, new_output_dir)
driver.close()
driver.switch_to.window(driver.window_handles[1])
driver.close()
driver.switch_to.window(driver.window_handles[0])
driver.get(c_url)
time.sleep(5)
def GraingerSelenium(agentRunContext):
log = Log(agentRunContext)
try:
download_dir_id = str(agentRunContext.jobId)
download_dir = os.path.join(
os.getcwd(), 'temp', 'temp-' + download_dir_id)
# Creating an output directory for storing PDFs
try:
os.mkdir(os.path.normpath(os.getcwd() +
os.sep + os.pardir) + '\\output')
except:
pass
output_dir = os.path.normpath(
os.getcwd() + os.sep + os.pardir) + '\\output\\'
os.mkdir(output_dir + download_dir_id)
new_output_dir = os.path.join(output_dir, download_dir_id)
driver = get_driver(download_dir)
driver.maximize_window()
driver.get(agentRunContext.URL)
wait = WebDriverWait(driver, 20)
log.job(config.JOB_RUNNING_STATUS, 'Job Started')
# Inputing Search-Param
driver.find_element_by_xpath(
'//input[@aria-label="Search Query"]').send_keys(agentRunContext.requestBody['search'])
time.sleep(2)
driver.find_element_by_xpath(
'//button[@aria-label="Submit Search Query"]').click()
time.sleep(5)
# If multi_products are there in search params
if len(driver.find_elements(By.XPATH, '//div[@class = "multi-tiered-category"]')) > 0:
multi_product(log, wait, driver, download_dir, new_output_dir)
# If single_products are there in search params
else:
single_product(log, driver, download_dir, new_output_dir, 0)
log.job(config.JOB_RUNNING_STATUS, 'Downloaded All Invoices')
except Exception as e:
log.job(config.JOB_COMPLETED_FAILED_STATUS, str(e))
log.info('exception', traceback.format_exc())
driver.quit()
#!/bin/bash
python3 app.py
# uwsgi --ini uwsgi.ini
[
{
"agentId": "APPLIED-SELENIUM",
"description": "Crawler For Applied",
"provider": "Applied",
"URL": "https://www.applied.com",
"scripts": {
"info": "AppliedSelenium",
"pdf": "AppliedSelenium"
}
},
{
"agentId": "APPLIED-SCRAPY",
"description": "Crawler For Applied",
"provider": "Applied",
"URL": "https://www.applied.com",
"scripts": {
"info": "AppliedScrapy",
"pdf": "AppliedScrapy"
}
},
{
"agentId": "GRAINGER-SELENIUM",
"description": "Crawler For Grainger",
"provider": "Grainger",
"URL": "https://www.grainger.com",
"scripts": {
"info": "GraingerSelenium",
"pdf": "GraingerSelenium"
}
},
{
"agentId": "GRAINGER-SCRAPY",
"description": "Crawler For Grainger",
"provider": "Grainger",
"URL": "https://www.grainger.com",
"scripts": {
"info": "GraingerScrapy",
"pdf": "GraingerScrapy"
}
}
]
\ No newline at end of file
from .agent_run_context import AgentRunContext
\ No newline at end of file
class AgentRunContext(object):
def __init__(self, req, jobType):
self.requestBody = req
self.jobId = None
self.URL = None
self.jobType = jobType
@property
def jobId(self):
return self._jobId
@jobId.setter
def jobId(self, value):
self._jobId = value
@property
def requestBody(self):
return self._requestBody
@requestBody.setter
def requestBody(self, value):
self._requestBody = value
@property
def URL(self):
return self._URL
@URL.setter
def URL(self, value):
self._URL = value
@property
def jobType(self):
return self._jobType
@jobType.setter
def jobType(self, value):
self._jobType = value
[uwsgi]
module = wsgi:wsgi_app
master = true
processes = 5
threads = 2
http-socket = :5001
socket = ../vfs.wsgi_app.sock
chmod-socket = 660
\ No newline at end of file
src/wsgi.py 0 → 100644
from app import server as wsgi_app
if __name__ == "__main__":
wsgi_app.run()
{
"info": {
"_postman_id": "9a1bcfd6-80ac-49a6-ad43-da29f9f6c9d0",
"name": "scraping-api-collections",
"schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json",
"_exporter_id": "14608642"
},
"item": [
{
"name": "agent-list",
"request": {
"auth": {
"type": "basic",
"basic": [
{
"key": "password",
"value": "YYYY",
"type": "string"
},
{
"key": "username",
"value": "XXXX",
"type": "string"
},
{
"key": "showPassword",
"value": false,
"type": "boolean"
}
]
},
"method": "GET",
"header": [],
"url": {
"raw": "http://0.0.0.0:5001/general/agents",
"protocol": "http",
"host": [
"0",
"0",
"0",
"0"
],
"port": "5001",
"path": [
"general",
"agents"
]
},
"description": "Retrieves the list for available agents."
},
"response": []
},
{
"name": "job-status",
"request": {
"auth": {
"type": "basic",
"basic": [
{
"key": "password",
"value": "YYYY",
"type": "string"
},
{
"key": "username",
"value": "XXXX",
"type": "string"
},
{
"key": "showPassword",
"value": false,
"type": "boolean"
}
]
},
"method": "GET",
"header": [],
"url": {
"raw": "http://0.0.0.0:5001/general/status?jobId",
"protocol": "http",
"host": [
"0",
"0",
"0",
"0"
],
"port": "5001",
"path": [
"general",
"status"
],
"query": [
{
"key": "jobId",
"value": null
}
]
},
"description": "Retrieves the status of given jobID."
},
"response": []
},
{
"name": "agent-run",
"request": {
"auth": {
"type": "basic",
"basic": [
{
"key": "password",
"value": "YYYY",
"type": "string"
},
{
"key": "username",
"value": "XXXX",
"type": "string"
},
{
"key": "showPassword",
"value": false,
"type": "boolean"
}
]
},
"method": "POST",
"header": [],
"body": {
"mode": "raw",
"raw": "{\r\n \"agentId\": \"AGENT-ID\",\r\n \"type\": \"TYPE\",\r\n \"search\": \"MY_SEARCH_PARAM\"\r\n}",
"options": {
"raw": {
"language": "json"
}
}
},
"url": {
"raw": "http://0.0.0.0:5001/general/run",
"protocol": "http",
"host": [
"0",
"0",
"0",
"0"
],
"port": "5001",
"path": [
"general",
"run"
]
},
"description": "initiated the JOB for given parameters."
},
"response": []
}
],
"auth": {
"type": "basic",
"basic": [
{
"key": "password",
"value": "generic@123#",
"type": "string"
},
{
"key": "username",
"value": "test",
"type": "string"
}
]
},
"event": [
{
"listen": "prerequest",
"script": {
"type": "text/javascript",
"exec": [
""
]
}
},
{
"listen": "test",
"script": {
"type": "text/javascript",
"exec": [
""
]
}
}
]
}
\ No newline at end of file
File added
%% Cell type:markdown id: tags:
# Scrapy documentation
Scrapy is a fast high-level web crawling and web scraping framework, used to crawl websites and extract structured data from their pages.
It can be used for a wide range of purposes, from data mining to monitoring and automated testing.
%% Cell type:markdown id: tags:
---
%% Cell type:markdown id: tags:
## INSTALLATION
you can install Scrapy and its dependencies from PyPI with:
> pip install Scrapy
For more information see [Installation documentation](https://docs.scrapy.org/en/latest/intro/install.html)
%% Cell type:markdown id: tags:
----
%% Cell type:markdown id: tags:
### SAMPLE SPIDER CODE
```
# file_name = quotes_spider.py
import scrapy
class QuotesSpider(scrapy.Spider):
name = 'quotes'
start_urls = [
'https://quotes.toscrape.com/tag/humor/',
]
def parse(self, response):
for quote in response.css('div.quote'):
yield {
'author': quote.xpath('span/small/text()').get(),
'text': quote.css('span.text::text').get(),
}
next_page = response.css('li.next a::attr("href")').get()
if next_page is not None:
yield response.follow(next_page, self.parse)
```
%% Cell type:markdown id: tags:
to run your scrapy spider:
> scrapy runspider quotes_spider.py -o quotes.json
%% Cell type:markdown id: tags:
## What just happened?
When you ran the command `scrapy runspider quotes_spider.py`, Scrapy looked for a Spider definition inside it and ran it through its crawler engine.
The crawl started by making requests to the URLs defined in the start_urls attribute (in this case, only the URL for quotes in humor category) and called the default callback method parse, passing the response object as an argument. In the parse callback, we loop through the quote elements using a CSS Selector, yield a Python dict with the extracted quote text and author, look for a link to the next page and schedule another request using the same parse method as callback.
Here you notice one of the main advantages about Scrapy: requests are scheduled and processed asynchronously. This means that Scrapy doesn’t need to wait for a request to be finished and processed, it can send another request or do other things in the meantime. This also means that other requests can keep going even if some request fails or an error happens while handling it.
%% Cell type:markdown id: tags:
---
%% Cell type:markdown id: tags:
### Simplest way to dump all my scraped items into a JSON/CSV/XML file?
To dump into a JSON file:
> scrapy crawl myspider -O items.json
To dump into a CSV file:
> scrapy crawl myspider -O items.csv
To dump into a XML file:
> scrapy crawl myspider -O items.xml
For more information see [Feed exports](https://docs.scrapy.org/en/latest/topics/feed-exports.html)
---
%% Cell type:markdown id: tags:
scrapy project example : [quotesbot](https://github.com/scrapy/quotesbot)
---
%% Cell type:markdown id: tags:
### Learn to Extract data
The best way to learn how to extract data with Scrapy is trying selectors using the Scrapy shell.
Run:
> scrapy shell 'https://quotes.toscrape.com/page/1/'
Using the shell, you can try selecting elements using CSS with the response object:
> ->>> response.css('title')
> [< Selector xpath='descendant-or-self::title' data='< title >Quotes to Scrape</ title>'>]
The result of running response.css('title') is a list-like object called SelectorList, which represents a list of Selector objects that wrap around XML/HTML elements and allow you to run further queries to fine-grain the selection or extract the data.
To extract the text from the title above, you can do:
> ->>>response.css('title::text').getall()
> ['Quotes to Scrape']
There are two things to note here: one is that we’ve added ::text to the CSS query, to mean we want to select only the text elements directly inside < title> element.
The other thing is that the result of calling .getall() is a list: it is possible that a selector returns more than one result, so we extract them all. When you know you just want the first result, as in this case, you can do:
> ->>>response.css('title::text').get()
> 'Quotes to Scrape'
As an alternative, you could’ve written:
> ->>>response.css('title::text')[0].get()
> 'Quotes to Scrape'
---
%% Cell type:markdown id: tags:
## Run Scrapy from a script
You can use the API to run Scrapy from a script, instead of the typical way of running Scrapy via `scrapy crawl`.
Remember that Scrapy is built on top of the Twisted asynchronous networking library, so you need to run it inside the Twisted reactor.
The first utility you can use to run your spiders is `scrapy.crawler.CrawlerProcess`.
This class will start a Twisted reactor for you, configuring the logging and setting shutdown handlers. This class is the one used by all Scrapy commands.
Note that you will also have to shutdown the Twisted reactor yourself after the spider is finished. This can be achieved by adding callbacks to the deferred returned by the `CrawlerRunner.crawl` method.
Here’s an example of its usage, along with a callback to manually stop the reactor after MySpider has finished running.
```
from twisted.internet import reactor
import scrapy
from scrapy.crawler import CrawlerRunner
from scrapy.utils.log import configure_logging
class MySpider(scrapy.Spider):
# Your spider definition
...
configure_logging({'LOG_FORMAT': '%(levelname)s: %(message)s'})
runner = CrawlerRunner()
d = runner.crawl(MySpider)
d.addBoth(lambda _: reactor.stop())
reactor.run() # the script will block here until the crawling is finished
```
import json
import time
from elasticsearch import Elasticsearch
import scrapy
from scrapy import Request
class AppliedSpider(scrapy.Spider):
name = 'applied'
user_agent = 'Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/44.0.2403.157 Safari/537.36'
def __init__(self, search_param=''):
self.api_url = 'https://www.applied.com'
self.start_urls = [
'https://www.applied.com/search?page=0&search-category=all&override=true&isLevelUp=false&q='+search_param]
super().__init__()
def collect_data(self, response):
# product url parsing
# specification data
spec = dict()
for trs in response.xpath('//*[@id="specifications"]//table//tr'):
key = trs.xpath('./td[1]/text()').get().strip()
value = trs.xpath('./td[2]/text()').get().strip()
spec[key] = value
# final data
data = {
'company': response.xpath('//h1[@itemprop="brand"]/a/text()').get().strip(),
'product': response.xpath('//span[@itemprop="mpn name"]/text()').get().strip(),
'details': response.xpath('//div[@class="details"]//text()').get().strip(),
'item': response.xpath('//div[@class="customer-part-number"]/text()').get().strip(),
'description': [x.strip() for x in response.xpath('//div[@class="short-description"]/ul/li/text()').extract()],
'specification': spec,
'url': response.url.strip(),
'timestamp': int(time.time()*1000)
}
yield data
def parse(self, response):
# search url parsing
for scrape_url in response.xpath('//a[@class="hide-for-print more-detail"]/@href').extract():
# extract product url
yield Request(self.api_url+scrape_url, self.collect_data)
# extract next page url and re-run function
next_page = response.xpath('//a[@class="next"]/@href').get()
if next_page is not None:
yield Request(self.api_url+next_page, self.parse)
import scrapy
class RSSpider(scrapy.Spider):
crawler = 'RSSpider'
name = 'RSSpider'
main_domain = 'https://in.rsdelivers.com'
start_urls = ['https://in.rsdelivers.com/productlist/search?query=749']
def parse(self,response):
for ele in response.css('a.snippet'):
my_href = ele.xpath('./@href').get()
yield scrapy.Request(url=self.main_domain+my_href,callback=self.collect_data)
def collect_data(self,response):
data = dict()
meta_data = response.css('div.row-inline::text').extract()
for i in range(0,100,3):
try:
data[meta_data[i]] = meta_data[i+2]
except:
break
data['title'] = str(response.css('h1.title::text').get()).strip()
data['url'] = response.url
yield data
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment