-- 按间隔3天持续7天的方式计算drop table if exists bi_register_clustering_user_detail;create table bi_register_clustering_user_detail(appsource string,appkey string,uid string,remain_flag string) partitioned by (pt_day string);-- 日志行为表建表语句drop table if exists bi_clustering_behavior_daily_state;create table bi_clustering_behavior_daily_state(appsource string,appkey string,uid string,remain_flag string,behavior_type string,behavior_flag string) partitioned by (pt_day string);-- Mysql行转列的表show create table `bi_clustering_behavior_daily_state`;CREATE TABLE `bi_clustering_behavior_daily_state` ( `appsource` varchar(8) DEFAULT NULL, `appkey` varchar(18) DEFAULT NULL, `uid` varchar(18) DEFAULT NULL, `remain_flag` int(11) DEFAULT NULL, `pt_day` varchar(10) DEFAULT NULL, `MessageSend` int(11) DEFAULT NULL, `PhoneBinding` int(11) DEFAULT NULL, `RoomSubscribe` int(11) DEFAULT NULL, `Gifts` int(11) DEFAULT NULL, `Pay` int(11) DEFAULT NULL, `VideoPlayaccess` int(11) DEFAULT NULL, `GameZoneAccess` int(11) DEFAULT NULL, `LivePRogramStatus` int(11) DEFAULT NULL, `WatchLongerThan5Minutes` int(11) DEFAULT NULL, `etl_time` datetime DEFAULT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8;2、日期处理代码/Users/nisj/PycharmProjects/BiDataProc/UserBehavior/DateProc.py# -*- coding=utf-8 -*-import warningsimport datetimewarnings.filterwarnings("ignore")def getNowDay(): DayNow = datetime.datetime.today().strftime('%Y-%m-%d') return DayNowdef getFristDay(): FristDay=datetime.datetime.strptime('2017-01-01', '%Y-%m-%d').strftime('%Y-%m-%d') return FristDaydef dateRange(beginDate, endDate): dates = [] dt = datetime.datetime.strptime(beginDate, "%Y-%m-%d") date = beginDate[:] while date <= endDate: dates.append(date) dt = dt + datetime.timedelta(1) date = dt.strftime("%Y-%m-%d") return datesdef weekRang(beginDate, endDate): week = set() for date in dateRange(beginDate, endDate): week.add(datetime.date(int(date[0:4]), int(date[5:7]), int(date[8:10])).isocalendar()[0:2]) wk_l = [] for wl in sorted(list(week)): wk_l.append(str(wl[0])+'#'+str(wl[1])) return wk_ldef getWeekFristday(weekflag): yearnum = weekflag[0:4] # 取到年份 weeknum = weekflag[5:7] # 取到周 stryearstart = yearnum + '0101' # 当年第一天 yearstart = datetime.datetime.strptime(stryearstart, '%Y%m%d') # 格式化为日期格式 yearstartcalendarmsg = yearstart.isocalendar() # 当年第一天的周信息 yearstartweekday = yearstartcalendarmsg[2] yearstartyear = yearstartcalendarmsg[0] if yearstartyear < int(yearnum): daydelat = (8 - int(yearstartweekday)) + (int(weeknum) - 1) * 7 else: daydelat = (8 - int(yearstartweekday)) + (int(weeknum) - 2) * 7 week1day = (yearstart + datetime.timedelta(days=daydelat)).date() return week1daydef getWeekLastday(weekflag): week7day = getWeekFristday(weekflag) + datetime.timedelta(days=6) return week7daydef getInterval3Last7(run_day): Interval3Last7_start_day = datetime.datetime.strptime(run_day, '%Y-%m-%d') + datetime.timedelta(days=4) Interval3Last7_end_day = datetime.datetime.strptime(run_day, '%Y-%m-%d') + datetime.timedelta(days=10) return run_day,Interval3Last7_start_day.strftime('%Y-%m-%d'),Interval3Last7_end_day.strftime('%Y-%m-%d')# Batch Test# print dateRange(getFristDay(), getNowDay())# print weekRang(getFristDay(), getNowDay())# for weekFrist in weekRang(getFristDay(), getNowDay()):# for weekSecond in weekRang(getFristDay(), getNowDay()):# if (weekFrist[0:4] == weekSecond[0:4] and int(weekFrist[5:]) + 1 == int(weekSecond[5:])): #暂未考虑跨年# if getWeekLastday(weekflag=weekFrist).strftime('%Y-%m-%d') < getNowDay() and getWeekLastday(weekflag=weekSecond).strftime('%Y-%m-%d') < getNowDay():# print weekFrist, weekSecond# print getWeekFristday(weekflag=weekFrist), getWeekLastday(weekflag=weekFrist), getWeekFristday(weekflag=weekSecond), getWeekLastday(weekflag=weekSecond)# 间隔日期时间的获取# run_day='2017-02-03'# print getInterval3Last7(run_day)# for run_day in dateRange(getFristDay(), (datetime.datetime.strptime(getNowDay(), '%Y-%m-%d') - datetime.timedelta(days=11)).strftime('%Y-%m-%d')):# print getInterval3Last7(run_day)# run_day = getInterval3Last7(run_day)[0]# Interval3Last7_start_day = getInterval3Last7(run_day)[1]# Interval3Last7_end_day = getInterval3Last7(run_day)[2]# print run_day,Interval3Last7_start_day,Interval3Last7_end_day3、是否留存用户数据分群处理业务处理:/Users/nisj/PycharmProjects/BiDataProc/UserBehavior/Hive_userClustering_detail_proc.py# -*- coding=utf-8 -*-from DateProc import *import oswarnings.filterwarnings("ignore")def register_clustering_user_detail_Interval3Last7(run_day, Interval3Last7_start_day, Interval3Last7_end_day): os.system("""source /etc/profile; / /usr/lib/hive-current/bin/hive -e " / add jar /home/hadoop/nisj/udf-jar/hadoop_udf_radixChange.jar; / create temporary function RadixChange as 'com.kascend.hadoop.RadixChange'; / alter table bi_register_clustering_user_detail drop if exists partition (pt_day='%s'); / alter table bi_register_clustering_user_detail add partition (pt_day='%s'); / with tab_user_register_info as( / select uid,appsource,appkey,pt_day from bi_all_register_info / where pt_day = '%s' / ), / tab_access_log_Interval3Last7 as ( / select RadixChange(lower(uid),16,10) uid from bi_all_access_log / where pt_day >= '%s' and pt_day <= '%s' / group by RadixChange(lower(uid),16,10)) / insert overwrite table bi_register_clustering_user_detail partition(pt_day='%s') / select a1.appsource,a1.appkey,a1.uid,case when a2.uid is not null then 1 else 0 end remain_flag / from tab_user_register_info a1 / left join tab_access_log_Interval3Last7 a2 on a1.uid=a2.uid / ;" / """ % (run_day, run_day, run_day, Interval3Last7_start_day, Interval3Last7_end_day, run_day))# Batch Test并行调度:/Users/nisj/PycharmProjects/BiDataProc/UserBehavior/BatchThread_clusteringUserDetail.py# -*- coding=utf-8 -*-import threadpoolimport timefrom Hive_userClustering_detail_proc import *warnings.filterwarnings("ignore")today = datetime.date.today()yesterday = today - datetime.timedelta(days=1)tomorrow = today + datetime.timedelta(days=1)now_time = time.strftime('%Y-%m-%d %X', time.localtime())print "当前时间是:",now_time# 间隔3天持续7天数据计算batch_runDay_list = []for run_day in dateRange(getFristDay(), (datetime.datetime.strptime(getNowDay(), '%Y-%m-%d') - datetime.timedelta(days=11)).strftime('%Y-%m-%d')): run_day = getInterval3Last7(run_day)[0] Interval3Last7_start_day = getInterval3Last7(run_day)[1] Interval3Last7_end_day = getInterval3Last7(run_day)[2] batch_runDay_list.append(([run_day,Interval3Last7_start_day,Interval3Last7_end_day], None))requests = []request_register_remain_user = threadpool.makeRequests(register_clustering_user_detail_Interval3Last7, batch_runDay_list)requests.extend(request_register_remain_user)main_pool = threadpool.ThreadPool(6)[main_pool.putRequest(req) for req in requests]if __name__ == '__main__': while True: try: time.sleep(3) main_pool.poll() except KeyboardInterrupt: print("**** Interrupted!") break except threadpool.NoResultsPending: break if main_pool.dismissedWorkers: print("Joining all dismissed worker threads...") main_pool.joinAllDismissedWorkers()now_time = time.strftime('%Y-%m-%d %X', time.localtime())print "当前时间是:",now_time4、用户注册当日用户行为处理业务处理:/Users/nisj/PycharmProjects/BiDataProc/UserBehavior/Hive_ClusteringBehavior_proc.py# -*- coding=utf-8 -*-import osimport refrom DateProc import *warnings.filterwarnings("ignore")def behaviorDailyTab_partitionAdd(run_day): os.system("""source /etc/profile; / /usr/lib/hive-current/bin/hive -e " / alter table bi_clustering_behavior_daily_state drop if exists partition (pt_day='%s'); / alter table bi_clustering_behavior_daily_state add partition (pt_day='%s'); / " / """ % (run_day, run_day))def behavior_MessageSend(run_day): os.system("""source /etc/profile; / /usr/lib/hive-current/bin/hive -e " / insert into table bi_clustering_behavior_daily_state partition (pt_day='%s') / select a1.appsource,a1.appkey,a1.uid,a1.remain_flag,'MessageSend' behavior_type,case when a2.uid is not null then 1 else 0 end behavior_flag / from (select * from bi_register_clustering_user_detail where pt_day='%s') a1 / left join (select uid from data_chushou_message_send where pt_day='%s' group by uid) a2 on a1.uid=a2.uid;" / """ % (run_day, run_day, run_day))def behavior_PhoneBinding(run_day): os.system("""source /etc/profile; / /usr/lib/hive-current/bin/hive -e " / insert into table bi_clustering_behavior_daily_state partition (pt_day='%s') / select a1.appsource,a1.appkey,a1.uid,a1.remain_flag,'PhoneBinding' behavior_type,case when a2.uid is not null then 1 else 0 end behavior_flag / from (select * from bi_register_clustering_user_detail where pt_day='%s') a1 / left join (select uid from data_chushou_user_phone_num where state=0 and pt_day='%s' group by uid) a2 on a1.uid=a2.uid;" / """ % (run_day, run_day, run_day))def behavior_RoomSubscribe(run_day): os.system("""source /etc/profile; / /usr/lib/hive-current/bin/hive -e " / insert into table bi_clustering_behavior_daily_state partition (pt_day='%s') / select a1.appsource,a1.appkey,a1.uid,a1.remain_flag,'RoomSubscribe' behavior_type,case when a2.uid is not null then 1 else 0 end behavior_flag / from (select * from bi_register_clustering_user_detail where pt_day='%s') a1 / left join (select uid from data_chushou_room_subscriber where state=0 and pt_day='%s' group by uid) a2 on a1.uid=a2.uid;" / """ % (run_day, run_day, run_day))def behavior_Pay(run_day): os.system("""source /etc/profile; / /usr/lib/hive-current/bin/hive -e " / insert into table bi_clustering_behavior_daily_state partition (pt_day='%s') / select a1.appsource,a1.appkey,a1.uid,a1.remain_flag,'Pay' behavior_type,case when a2.uid is not null then 1 else 0 end behavior_flag / from (select * from bi_register_clustering_user_detail where pt_day='%s') a1 / left join (select uid from data_chushou_pay_info where state=0 and pt_day='%s' group by uid) a2 on a1.uid=a2.uid;" / """ % (run_day, run_day, run_day))def behavior_Gifts(run_day): os.system("""source /etc/profile; / /usr/lib/hive-current/bin/hive -e " / insert into table bi_clustering_behavior_daily_state partition (pt_day='%s') / select a1.appsource,a1.appkey,a1.uid,a1.remain_flag,'Gifts' behavior_type,case when a2.uid is not null then 1 else 0 end behavior_flag / from (select * from bi_register_clustering_user_detail where pt_day='%s') a1 / left join (select fromuid uid from bi_all_jellyfish_log where logtype=1 and pt_day='%s' group by fromuid) a2 on a1.uid=a2.uid;" / """ % (run_day, run_day, run_day))def behavior_GameZoneAccess(run_day): os.system("""source /etc/profile; / /usr/lib/hive-current/bin/hive -e " / insert into table bi_clustering_behavior_daily_state partition (pt_day='%s') / select a1.appsource,a1.appkey,a1.uid,a1.remain_flag,'GameZoneAccess' behavior_type,case when a2.uid is not null then 1 else 0 end behavior_flag / from (select * from bi_register_clustering_user_detail where pt_day='%s') a1 / left join (select uid from bi_all_jellyfish_log where logtype=5 and pt_day='%s' group by uid) a2 on a1.uid=a2.uid;" / """ % (run_day, run_day, run_day))def behavior_VideoPlayAccess(run_day): os.system("""source /etc/profile; / /usr/lib/hive-current/bin/hive -e " / insert into table bi_clustering_behavior_daily_state partition (pt_day='%s') / select a1.appsource,a1.appkey,a1.uid,a1.remain_flag,'VideoPlayAccess' behavior_type,case when a2.uid is not null then 1 else 0 end behavior_flag / from (select * from bi_register_clustering_user_detail where pt_day='%s') a1 / left join (select uid from bi_all_jellyfish_log where logtype=6 and pt_day='%s' group by uid) a2 on a1.uid=a2.uid;" / """ % (run_day, run_day, run_day))def behavior_WatchLongerThan5Minutes(run_day): os.system("""source /etc/profile; / /usr/lib/hive-current/bin/hive -e " / insert into table bi_clustering_behavior_daily_state partition (pt_day='%s') / select a1.appsource,a1.appkey,a1.uid,a1.remain_flag,'WatchLongerThan5Minutes' behavior_type,case when a2.uid is not null then 1 else 0 end behavior_flag / from (select * from bi_register_clustering_user_detail where pt_day='%s') a1 / left join (select uid from data_chushou_daily_heartbeat_stat_v2 where pt_day='%s' group by uid having count(*)>=5) a2 on a1.uid=a2.uid;" / """ % (run_day, run_day, run_day))def behavior_LiveProgramStatus(run_day): os.system("""source /etc/profile; / /usr/lib/hive-current/bin/hive -e " / insert into table bi_clustering_behavior_daily_state partition (pt_day='%s') / select a1.appsource,a1.appkey,a1.uid,a1.remain_flag,'LiveProgramStatus' behavior_type,case when a2.uid is not null then 1 else 0 end behavior_flag / from (select * from bi_register_clustering_user_detail where pt_day='%s') a1 / left join (select x2.uid from (select room_id from data_chushou_live_history_status where pt_day='%s' group by room_id) x1 inner join (select id room_id,creator_uid uid from data_chushou_live_uid_status where state=0) x2 on x1.room_id=x2.room_id group by x2.uid) a2 on a1.uid=a2.uid;" / """ % (run_day, run_day, run_day))def behavior_batchCtl(run_day): behaviorDailyTab_partitionAdd(run_day) # 分区的添加 behavior_MessageSend(run_day) #用户行为数据的插入 behavior_PhoneBinding(run_day) behavior_RoomSubscribe(run_day) behavior_Pay(run_day) behavior_Gifts(run_day) behavior_GameZoneAccess(run_day) behavior_VideoPlayAccess(run_day) behavior_WatchLongerThan5Minutes(run_day) behavior_LiveProgramStatus(run_day)def getBehaviorDayList(): dayMinMax_data = os.popen ("""source /etc/profile; / /usr/lib/hive-current/bin/hive -e " / select min(pt_day),max(pt_day) from bi_register_clustering_user_detail;" / """).readlines(); DayMM_list = [] for dm_list in dayMinMax_data: dm = re.split('/t', dm_list.replace('/n', '')) DayMM_list.append(dm) for daym in DayMM_list: beginDate = daym[0] endDate = daym[1] return dateRange(beginDate, endDate)# Batch Test# run_day = '2017-01-02'# behavior_batchCtl(run_day)# for run_day in getBehaviorDayList():# print run_day并行调度:/Users/nisj/PycharmProjects/BiDataProc/UserBehavior/BatchThread_Behavior.py# -*- coding=utf-8 -*-import threadpoolimport timefrom Hive_ClusteringBehavior_proc import *warnings.filterwarnings("ignore")today = datetime.date.today()yesterday = today - datetime.timedelta(days=1)tomorrow = today + datetime.timedelta(days=1)now_time = time.strftime('%Y-%m-%d %X', time.localtime())print "当前时间是:",now_time# 计算2017年的数据信息batch_runDay_list = getBehaviorDayList()requests = []request_behavior_batchCtl = threadpool.makeRequests(behavior_batchCtl, batch_runDay_list)requests.extend(request_behavior_batchCtl)main_pool = threadpool.ThreadPool(10)[main_pool.putRequest(req) for req in requests]if __name__ == '__main__': while True: try: time.sleep(30) main_pool.poll() except KeyboardInterrupt: print("**** Interrupted!") break except threadpool.NoResultsPending: break if main_pool.dismissedWorkers: print("Joining all dismissed worker threads...") main_pool.joinAllDismissedWorkers()now_time = time.strftime('%Y-%m-%d %X', time.localtime())print "当前时间是:",now_time5、用户行为结果表进行行列转换并插入到Mysql/Users/nisj/PycharmProjects/BiDataProc/UserBehavior/Hive_ClusteringBehavior_line2col.py# -*- coding=utf-8 -*-import osimport reimport timefrom DateProc import *warnings.filterwarnings("ignore")def behaviorLine2Col_intoMysql(): os.system("""/usr/bin/mysql -hMysqlHost -PMysqlPort -uMysqlUser -pMysqlPass -e "use funnyai_data; / truncate table bi_clustering_behavior_daily_state; / " """ ) behaviorData = os.popen("""source /etc/profile; / /usr/lib/hive-current/bin/hive -e " / select appsource,appkey,uid,remain_flag,pt_day, / max(case when behavior_type='MessageSend' then behavior_flag else null end) MessageSend, / max(case when behavior_type='PhoneBinding' then behavior_flag else null end) PhoneBinding, / max(case when behavior_type='RoomSubscribe' then behavior_flag else null end) RoomSubscribe, / max(case when behavior_type='Gifts' then behavior_flag else null end) Gifts, / max(case when behavior_type='Pay' then behavior_flag else null end) Pay, / max(case when behavior_type='VideoPlayAccess' then behavior_flag else null end) VideoPlayAccess, / max(case when behavior_type='GameZoneAccess' then behavior_flag else null end) GameZoneAccess, / max(case when behavior_type='LiveProgramStatus' then behavior_flag else null end) LiveProgramStatus, / max(case when behavior_type='WatchLongerThan5Minutes' then behavior_flag else null end) WatchLongerThan5Minutes / from bi_clustering_behavior_daily_state / where pt_day is not null / group by appsource,appkey,uid,remain_flag,pt_day / ;" / """ ).readlines(); behavior_list = [] for bhL in behaviorData: bh = re.split('/t', bhL.replace('/n', '')) behavior_list.append(bh) i = 0 insert_mysql_sql = """/usr/bin/mysql -hMysqlHost -PMysqlPort -uMysqlUser -pMysqlPass -e "use funnyai_data; / insert into bi_clustering_behavior_daily_state(appsource, appkey, uid, remain_flag, pt_day, MessageSend, PhoneBinding, RoomSubscribe, Gifts, Pay, VideoPlayAccess, GameZoneAccess, LiveProgramStatus, WatchLongerThan5Minutes, etl_time) / values """ for behavior_line in behavior_list: appsource = behavior_line[0] appkey = behavior_line[1] uid = behavior_line[2] remain_flag = behavior_line[3] pt_day = behavior_line[4] MessageSend = behavior_line[5] PhoneBinding = behavior_line[6] RoomSubscribe = behavior_line[7] Gifts = behavior_line[8] Pay = behavior_line[9] VideoPlayAccess = behavior_line[10] GameZoneAccess = behavior_line[11] LiveProgramStatus = behavior_line[12] WatchLongerThan5Minutes = behavior_line[13] etl_time = time.strftime('%Y-%m-%d %X', time.localtime()) i += 1 insert_mysql_sql = insert_mysql_sql + """('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s'),""" % (appsource, appkey, uid, remain_flag, pt_day, MessageSend, PhoneBinding, RoomSubscribe, Gifts, Pay, VideoPlayAccess, GameZoneAccess, LiveProgramStatus, WatchLongerThan5Minutes, etl_time) if (i % 1000 == 0): insert_mysql_sql = insert_mysql_sql.rstrip(',') + """ ;" """ os.system(insert_mysql_sql) insert_mysql_sql = """/usr/bin/mysql -hMysqlHost -PMysqlPort -uMysqlUser -pMysqlPass -e "use funnyai_data; / insert into bi_clustering_behavior_daily_state(appsource, appkey, uid, remain_flag, pt_day, MessageSend, PhoneBinding, RoomSubscribe, Gifts, Pay, VideoPlayAccess, GameZoneAccess, LiveProgramStatus, WatchLongerThan5Minutes, etl_time) / values """ insert_mysql_sql = insert_mysql_sql.rstrip(',') + """ ;" """ os.system(insert_mysql_sql) # print insert_mysql_sqldef getBehaviorDayList(): dayMinMax_data = os.popen ("""source /etc/profile; / /usr/lib/hive-current/bin/hive -e " / select min(pt_day),max(pt_day) from bi_clustering_behavior_daily_state;" / """).readlines(); DayMM_list = [] for dm_list in dayMinMax_data: dm = re.split('/t', dm_list.replace('/n', '')) DayMM_list.append(dm) for daym in DayMM_list: beginDate = daym[0] endDate = daym[1] return dateRange(beginDate, endDate)# Batch Test# for run_day in getBehaviorDayList():# print run_daybehaviorLine2Col_intoMysql()注意Mysql数据批量插入的方法使用!
新闻热点
疑难解答