大数据Hadoop之——数据采集存储到HDFS实战(Python版本)
off999 2025-07-10 19:56 15 浏览 0 评论
要实现这个示例,必须先安装好hadoop和hive环境,环境部署可以参考我之前的文章:
大数据Hadoop原理介绍+安装+实战操作(HDFS+YARN+MapReduce)
大数据Hadoop之——数据仓库Hive
【流程图如下】
【示例代码如下】
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : liugp
# @File : Data2HDFS.py
"""
# pip install sasl可能安装不成功
pip install sasl
# 可以选择离线安装
https://www.lfd.uci.edu/~gohlke/pythonlibs/#sasl
pip install sasl-0.3.1-cp37-cp37m-win_amd64.whl
pip install thrift
pip install thrift-sasl
pip install pyhive
pip install hdfs
"""
from selenium import webdriver
from pyhive import hive
from hdfs import InsecureClient
class Data2HDFS:
def __init__(self):
# 第一个步,连接到hive
conn = hive.connect(host='192.168.0.113', port=11000, username='root', database='default')
# 第二步,建立一个游标
self.cursor = conn.cursor()
self.fs = InsecureClient(url='http://192.168.0.113:9870/', user='root', root='/')
"""
采集数据
"""
def collectData(self):
try:
driver = webdriver.Edge("../drivers/msedgedriver.exe")
# 爬取1-3页数据,可自行扩展
id = 1
local_path = './data.txt'
with open(local_path, 'w', encoding='utf-8') as f:
for i in range(1, 2):
url = "https://ac.qq.com/Comic/index/page/" + str(i)
driver.get(url)
# 模拟滚动
js = "return action=document.body.scrollHeight"
new_height = driver.execute_script(js)
for i in range(0, new_height, 10):
driver.execute_script('window.scrollTo(0, %s)' % (i))
list = driver.find_element_by_class_name('ret-search-list').find_elements_by_tag_name('li')
data = []
for item in list:
imgsrc = item.find_element_by_tag_name('img').get_attribute('src')
author = item.find_element_by_class_name("ret-works-author").text
leixing_spanlist = item.find_element_by_class_name("ret-works-tags").find_elements_by_tag_name(
'span')
leixing = leixing_spanlist[0].text + "," + leixing_spanlist[1].text
neirong = item.find_element_by_class_name("ret-works-decs").text
gengxin = item.find_element_by_class_name("mod-cover-list-mask").text
itemdata = {"id": str(id), 'imgsrc': imgsrc, 'author': author, 'leixing': leixing, 'neirong': neirong,
'gengxin': gengxin}
print(itemdata)
line = itemdata['id'] +"," + itemdata['imgsrc'] +"," + itemdata['author'] + "," + itemdata['leixing'] + "," + itemdata['neirong'] + itemdata['gengxin'] + "\n"
f.write(line)
id+=1
data.append(itemdata)
# 上传文件,
d2f.uplodatLocalFile2HDFS(local_path)
except Exception as e:
print(e)
"""创建hive表"""
def createTable(self):
# 解决hive表中文乱码问题
"""
mysql -uroot -p
use hive数据库
alter table COLUMNS_V2 modify column COMMENT varchar(256) character set utf8;
alter table TABLE_PARAMS modify column PARAM_VALUE varchar(4000) character set utf8;
alter table PARTITION_PARAMS modify column PARAM_VALUE varchar(4000) character set utf8;
alter table PARTITION_KEYS modify column PKEY_COMMENT varchar(4000) character set utf8;
alter table INDEX_PARAMS modify column PARAM_VALUE varchar(4000) character set utf8;
commit;
:return:
"""
self.cursor.execute("CREATE TABLE IF NOT EXISTS default.datatable (\
id INT COMMENT 'ID',\
imgsrc STRING COMMENT 'img src',\
author STRING COMMENT 'author',\
leixing STRING COMMENT '类型',\
neirong STRING COMMENT '内容',\
gengxin STRING COMMENT '更新'\
)\
ROW FORMAT DELIMITED\
FIELDS TERMINATED BY ','\
COLLECTION ITEMS TERMINATED BY '-'\
MAP KEYS TERMINATED BY ':'\
LINES TERMINATED BY '\n'")
"""
将本地文件推送到HDFS上
"""
def uplodatLocalFile2HDFS(self, local_path):
hdfs_path = '/tmp/test0508/'
self.fs.makedirs(hdfs_path)
# 如果文件存在就必须先删掉
self.fs.delete(hdfs_path + '/' + local_path)
print(hdfs_path, local_path)
self.fs.upload(hdfs_path, local_path)
"""
将HDFS上的文件load到hive表
"""
def data2Hive(self):
# 先清空表
self.cursor.execute("truncate table datatable")
# 加载数据,这里的路径就是HDFS上的文件路径
self.cursor.execute("load data inpath '/tmp/test0508/data.txt' into table datatable")
self.cursor.execute("select * from default.datatable")
print(self.cursor.fetchall())
if __name__ == "__main__":
d2f = Data2HDFS()
# 收集数据
d2f.collectData()
# 创建hive表
# d2f.createTable()
# 将数据存储到HDFS
d2f.data2Hive()
【温馨提示】hiveserver2的默认端口是10000,我是上面写的11000端口,是因为我配置文件里修改了,如果使用的是默认端口,记得修改成10000端口,还有就是修改成自己的host地址。这个只是一种实现方式,还有其它方式。
如果小伙伴有疑问的话,欢迎给我留言,后续会更新更多关于大数据的文章,请耐心等待~
相关推荐
- Linux 网络协议栈_linux网络协议栈
-
前言;更多学习资料(包含视频、技术学习路线图谱、文档等)后台私信《资料》免费领取技术点包含了C/C++,Linux,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,Z...
- 揭秘 BPF map 前生今世_bpfdm
-
1.前言众所周知,map可用于内核BPF程序和用户应用程序之间实现双向的数据交换,为BPF技术中的重要基础数据结构。在BPF程序中可以通过声明structbpf_map_def...
- 教你简单 提取fmpeg 视频,音频,字幕 方法
-
ffmpeg提取视频,音频,字幕方法(HowtoExtractVideo,Audio,SubtitlefromOriginalVideo?)1.提取视频(ExtractVi...
- Linux内核原理到代码详解《内核视频教程》
-
Linux内核原理-进程入门进程进程不仅仅是一段可执行程序的代码,通常进程还包括其他资源,比如打开的文件,挂起的信号,内核内部的数据结构,处理器状态,内存地址空间,或多个执行线程,存放全局变量的数据段...
- Linux C Socket UDP编程详解及实例分享
-
1、UDP网络编程主要流程UDP协议的程序设计框架,客户端和服务器之间的差别在于服务器必须使用bind()函数来绑定侦听的本地UDP端口,而客户端则可以不进行绑定,直接发送到服务器地址的某个端口地址。...
- libevent源码分析之bufferevent使用详解
-
libevent的bufferevent在event的基础上自己维护了一个buffer,这样的话,就不需要再自己管理一个buffer了。先看看structbufferevent这个结构体struct...
- 一次解决Linux内核内存泄漏实战全过程
-
什么是内存泄漏:程序向系统申请内存,使用完不需要之后,不释放内存还给系统回收,造成申请的内存被浪费.发现系统中内存使用量随着时间的流逝,消耗的越来越多,例如下图所示:接下来的排查思路是:1.监控系统中...
- 彻底搞清楚内存泄漏的原因,如何避免内存泄漏,如何定位内存泄漏
-
作为C/C++开发人员,内存泄漏是最容易遇到的问题之一,这是由C/C++语言的特性引起的。C/C++语言与其他语言不同,需要开发者去申请和释放内存,即需要开发者去管理内存,如果内存使用不当,就容易造成...
- linux网络编程常见API详解_linux网络编程视频教程
-
Linux网络编程API函数初步剖析今天我们来分析一下前几篇博文中提到的网络编程中几个核心的API,探究一下当我们调用每个API时,内核中具体做了哪些准备和初始化工作。1、socket(family...
- Linux下C++访问web—使用libcurl库调用http接口发送解析json数据
-
一、背景这两天由于一些原因研究了研究如何在客户端C++代码中调用web服务端接口,需要访问url,并传入json数据,拿到返回值,并解析。 现在的情形是远程服务端的接口参数和返回类型都是json的字符...
- 平衡感知调节:“系统如人” 视角下的架构设计与业务稳定之道
-
在今天这个到处都是数字化的时代,系统可不是一堆冷冰冰的代码。它就像一个活生生的“数字人”,没了它,业务根本转不起来。总说“技术要为业务服务”,但实际操作起来问题不少:系统怎么才能快速响应业务需求?...
- 谈谈分布式文件系统下的本地缓存_什么是分布式文件存储
-
在分布式文件系统中,为了提高系统的性能,常常会引入不同类型的缓存存储系统(算法优化所带来的的效果可能远远不如缓存带来的优化效果)。在软件中缓存存储系统一般可分为了两类:一、分布式缓存,例如:Memca...
- 进程间通信之信号量semaphore--linux内核剖析
-
什么是信号量信号量的使用主要是用来保护共享资源,使得资源在一个时刻只有一个进程(线程)所拥有。信号量的值为正的时候,说明它空闲。所测试的线程可以锁定而使用它。若为0,说明它被占用,测试的线程要进入睡眠...
- Qt编写推流程序/支持webrtc265/从此不用再转码/打开新世界的大门
-
一、前言在推流领域,尤其是监控行业,现在主流设备基本上都是265格式的视频流,想要在网页上直接显示监控流,之前的方案是,要么转成hls,要么魔改支持265格式的flv,要么265转成264,如果要追求...
- 30 分钟搞定 SpringBoot 视频推拉流!实战避坑指南
-
30分钟搞定SpringBoot视频推拉流!实战避坑指南在音视频开发领域,SpringBoot凭借其快速开发特性,成为很多开发者实现视频推拉流功能的首选框架。但实际开发中,从环境搭建到流处理优...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- python计时 (73)
- python安装路径 (56)
- python类型转换 (93)
- python进度条 (67)
- python吧 (67)
- python的for循环 (65)
- python格式化字符串 (61)
- python静态方法 (57)
- python列表切片 (59)
- python面向对象编程 (60)
- python 代码加密 (65)
- python串口编程 (77)
- python封装 (57)
- python写入txt (66)
- python读取文件夹下所有文件 (59)
- python操作mysql数据库 (66)
- python获取列表的长度 (64)
- python接口 (63)
- python调用函数 (57)
- python多态 (60)
- python匿名函数 (59)
- python打印九九乘法表 (65)
- python赋值 (62)
- python异常 (69)
- python元祖 (57)