首页 > 编程 > Python > 正文

Python计算注册用户当日发生行为示例

2019-11-08 01:55:11
字体:
来源:转载
供稿:网友
思路:根据观察新注册用户隔三天持续七天的用户是否留存,查看用户注册当日在弹幕发送、手机绑定等方面的用户行信息。1、用到的目标Hive及MySQL
-- 按间隔3天持续7天的方式计算drop table if exists bi_register_clustering_user_detail;create table bi_register_clustering_user_detail(appsource string,appkey string,uid string,remain_flag string) partitioned by (pt_day string);-- 日志行为表建表语句drop table if exists bi_clustering_behavior_daily_state;create table bi_clustering_behavior_daily_state(appsource string,appkey string,uid string,remain_flag string,behavior_type string,behavior_flag string) partitioned by (pt_day string);-- Mysql行转列的表show create table `bi_clustering_behavior_daily_state`;CREATE TABLE `bi_clustering_behavior_daily_state` (  `appsource` varchar(8) DEFAULT NULL,  `appkey` varchar(18) DEFAULT NULL,  `uid` varchar(18) DEFAULT NULL,  `remain_flag` int(11) DEFAULT NULL,  `pt_day` varchar(10) DEFAULT NULL,  `MessageSend` int(11) DEFAULT NULL,  `PhoneBinding` int(11) DEFAULT NULL,  `RoomSubscribe` int(11) DEFAULT NULL,  `Gifts` int(11) DEFAULT NULL,  `Pay` int(11) DEFAULT NULL,  `VideoPlayaccess` int(11) DEFAULT NULL,  `GameZoneAccess` int(11) DEFAULT NULL,  `LivePRogramStatus` int(11) DEFAULT NULL,  `WatchLongerThan5Minutes` int(11) DEFAULT NULL,  `etl_time` datetime DEFAULT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8;2、日期处理代码/Users/nisj/PycharmProjects/BiDataProc/UserBehavior/DateProc.py
# -*- coding=utf-8 -*-import warningsimport datetimewarnings.filterwarnings("ignore")def getNowDay():    DayNow = datetime.datetime.today().strftime('%Y-%m-%d')    return DayNowdef getFristDay():    FristDay=datetime.datetime.strptime('2017-01-01', '%Y-%m-%d').strftime('%Y-%m-%d')    return FristDaydef dateRange(beginDate, endDate):    dates = []    dt = datetime.datetime.strptime(beginDate, "%Y-%m-%d")    date = beginDate[:]    while date <= endDate:        dates.append(date)        dt = dt + datetime.timedelta(1)        date = dt.strftime("%Y-%m-%d")    return datesdef weekRang(beginDate, endDate):    week = set()    for date in dateRange(beginDate, endDate):        week.add(datetime.date(int(date[0:4]), int(date[5:7]), int(date[8:10])).isocalendar()[0:2])    wk_l = []    for wl in sorted(list(week)):        wk_l.append(str(wl[0])+'#'+str(wl[1]))    return wk_ldef getWeekFristday(weekflag):    yearnum = weekflag[0:4]  # 取到年份    weeknum = weekflag[5:7]  # 取到周    stryearstart = yearnum + '0101'  # 当年第一天    yearstart = datetime.datetime.strptime(stryearstart, '%Y%m%d')  # 格式化为日期格式    yearstartcalendarmsg = yearstart.isocalendar()  # 当年第一天的周信息    yearstartweekday = yearstartcalendarmsg[2]    yearstartyear = yearstartcalendarmsg[0]    if yearstartyear < int(yearnum):        daydelat = (8 - int(yearstartweekday)) + (int(weeknum) - 1) * 7    else:        daydelat = (8 - int(yearstartweekday)) + (int(weeknum) - 2) * 7    week1day = (yearstart + datetime.timedelta(days=daydelat)).date()    return week1daydef getWeekLastday(weekflag):    week7day = getWeekFristday(weekflag) + datetime.timedelta(days=6)    return week7daydef getInterval3Last7(run_day):    Interval3Last7_start_day = datetime.datetime.strptime(run_day, '%Y-%m-%d') + datetime.timedelta(days=4)    Interval3Last7_end_day = datetime.datetime.strptime(run_day, '%Y-%m-%d') + datetime.timedelta(days=10)    return run_day,Interval3Last7_start_day.strftime('%Y-%m-%d'),Interval3Last7_end_day.strftime('%Y-%m-%d')# Batch Test# print dateRange(getFristDay(), getNowDay())# print weekRang(getFristDay(), getNowDay())# for weekFrist in weekRang(getFristDay(), getNowDay()):#     for weekSecond in weekRang(getFristDay(), getNowDay()):#         if (weekFrist[0:4] == weekSecond[0:4] and int(weekFrist[5:]) + 1 == int(weekSecond[5:])): #暂未考虑跨年#             if getWeekLastday(weekflag=weekFrist).strftime('%Y-%m-%d') < getNowDay() and getWeekLastday(weekflag=weekSecond).strftime('%Y-%m-%d') < getNowDay():#                 print weekFrist, weekSecond#                 print getWeekFristday(weekflag=weekFrist), getWeekLastday(weekflag=weekFrist), getWeekFristday(weekflag=weekSecond), getWeekLastday(weekflag=weekSecond)# 间隔日期时间的获取# run_day='2017-02-03'# print getInterval3Last7(run_day)# for run_day in dateRange(getFristDay(), (datetime.datetime.strptime(getNowDay(), '%Y-%m-%d') - datetime.timedelta(days=11)).strftime('%Y-%m-%d')):#     print getInterval3Last7(run_day)#     run_day = getInterval3Last7(run_day)[0]#     Interval3Last7_start_day = getInterval3Last7(run_day)[1]#     Interval3Last7_end_day = getInterval3Last7(run_day)[2]#     print run_day,Interval3Last7_start_day,Interval3Last7_end_day3、是否留存用户数据分群处理业务处理:/Users/nisj/PycharmProjects/BiDataProc/UserBehavior/Hive_userClustering_detail_proc.py
# -*- coding=utf-8 -*-from DateProc import *import oswarnings.filterwarnings("ignore")def register_clustering_user_detail_Interval3Last7(run_day, Interval3Last7_start_day, Interval3Last7_end_day):    os.system("""source /etc/profile; /            /usr/lib/hive-current/bin/hive -e " /            add jar /home/hadoop/nisj/udf-jar/hadoop_udf_radixChange.jar; /            create temporary function RadixChange as 'com.kascend.hadoop.RadixChange'; /            alter table bi_register_clustering_user_detail drop if exists partition (pt_day='%s'); /            alter table bi_register_clustering_user_detail add partition (pt_day='%s'); /            with tab_user_register_info as( /            select uid,appsource,appkey,pt_day from bi_all_register_info /            where pt_day = '%s' /            ), /            tab_access_log_Interval3Last7 as ( /            select RadixChange(lower(uid),16,10) uid from bi_all_access_log /            where pt_day >= '%s' and pt_day <= '%s' /            group by RadixChange(lower(uid),16,10)) /            insert overwrite table bi_register_clustering_user_detail partition(pt_day='%s') /            select a1.appsource,a1.appkey,a1.uid,case when a2.uid is not null then 1 else 0 end remain_flag /            from tab_user_register_info a1 /            left join tab_access_log_Interval3Last7 a2 on a1.uid=a2.uid /            ;" /            """ % (run_day, run_day, run_day, Interval3Last7_start_day, Interval3Last7_end_day, run_day))# Batch Test并行调度:/Users/nisj/PycharmProjects/BiDataProc/UserBehavior/BatchThread_clusteringUserDetail.py
# -*- coding=utf-8 -*-import threadpoolimport timefrom Hive_userClustering_detail_proc import *warnings.filterwarnings("ignore")today = datetime.date.today()yesterday = today - datetime.timedelta(days=1)tomorrow = today + datetime.timedelta(days=1)now_time = time.strftime('%Y-%m-%d %X', time.localtime())print "当前时间是:",now_time# 间隔3天持续7天数据计算batch_runDay_list = []for run_day in dateRange(getFristDay(), (datetime.datetime.strptime(getNowDay(), '%Y-%m-%d') - datetime.timedelta(days=11)).strftime('%Y-%m-%d')):    run_day = getInterval3Last7(run_day)[0]    Interval3Last7_start_day = getInterval3Last7(run_day)[1]    Interval3Last7_end_day = getInterval3Last7(run_day)[2]    batch_runDay_list.append(([run_day,Interval3Last7_start_day,Interval3Last7_end_day], None))requests = []request_register_remain_user = threadpool.makeRequests(register_clustering_user_detail_Interval3Last7, batch_runDay_list)requests.extend(request_register_remain_user)main_pool = threadpool.ThreadPool(6)[main_pool.putRequest(req) for req in requests]if __name__ == '__main__':    while True:        try:            time.sleep(3)            main_pool.poll()        except KeyboardInterrupt:            print("**** Interrupted!")            break        except threadpool.NoResultsPending:            break    if main_pool.dismissedWorkers:        print("Joining all dismissed worker threads...")        main_pool.joinAllDismissedWorkers()now_time = time.strftime('%Y-%m-%d %X', time.localtime())print "当前时间是:",now_time4、用户注册当日用户行为处理业务处理:/Users/nisj/PycharmProjects/BiDataProc/UserBehavior/Hive_ClusteringBehavior_proc.py
# -*- coding=utf-8 -*-import osimport refrom DateProc import *warnings.filterwarnings("ignore")def behaviorDailyTab_partitionAdd(run_day):    os.system("""source /etc/profile; /            /usr/lib/hive-current/bin/hive -e " /            alter table bi_clustering_behavior_daily_state drop if exists partition (pt_day='%s'); /            alter table bi_clustering_behavior_daily_state add partition (pt_day='%s'); /            " /            """ % (run_day, run_day))def behavior_MessageSend(run_day):    os.system("""source /etc/profile; /            /usr/lib/hive-current/bin/hive -e " /            insert into table bi_clustering_behavior_daily_state partition (pt_day='%s') /            select a1.appsource,a1.appkey,a1.uid,a1.remain_flag,'MessageSend' behavior_type,case when a2.uid is not null then 1 else 0 end behavior_flag /            from (select * from bi_register_clustering_user_detail where pt_day='%s') a1 /            left join (select uid from data_chushou_message_send where pt_day='%s' group by uid) a2 on a1.uid=a2.uid;" /            """ % (run_day, run_day, run_day))def behavior_PhoneBinding(run_day):    os.system("""source /etc/profile; /            /usr/lib/hive-current/bin/hive -e " /            insert into table bi_clustering_behavior_daily_state partition (pt_day='%s') /            select a1.appsource,a1.appkey,a1.uid,a1.remain_flag,'PhoneBinding' behavior_type,case when a2.uid is not null then 1 else 0 end behavior_flag /            from (select * from bi_register_clustering_user_detail where pt_day='%s') a1 /            left join (select uid from data_chushou_user_phone_num where state=0 and pt_day='%s' group by uid) a2 on a1.uid=a2.uid;" /            """ % (run_day, run_day, run_day))def behavior_RoomSubscribe(run_day):    os.system("""source /etc/profile; /            /usr/lib/hive-current/bin/hive -e " /            insert into table bi_clustering_behavior_daily_state partition (pt_day='%s') /            select a1.appsource,a1.appkey,a1.uid,a1.remain_flag,'RoomSubscribe' behavior_type,case when a2.uid is not null then 1 else 0 end behavior_flag /            from (select * from bi_register_clustering_user_detail where pt_day='%s') a1 /            left join (select uid from data_chushou_room_subscriber where state=0 and pt_day='%s' group by uid) a2 on a1.uid=a2.uid;" /            """ % (run_day, run_day, run_day))def behavior_Pay(run_day):    os.system("""source /etc/profile; /            /usr/lib/hive-current/bin/hive -e " /            insert into table bi_clustering_behavior_daily_state partition (pt_day='%s') /            select a1.appsource,a1.appkey,a1.uid,a1.remain_flag,'Pay' behavior_type,case when a2.uid is not null then 1 else 0 end behavior_flag /            from (select * from bi_register_clustering_user_detail where pt_day='%s') a1 /            left join (select uid from data_chushou_pay_info where state=0 and pt_day='%s' group by uid) a2 on a1.uid=a2.uid;" /            """ % (run_day, run_day, run_day))def behavior_Gifts(run_day):    os.system("""source /etc/profile; /            /usr/lib/hive-current/bin/hive -e " /            insert into table bi_clustering_behavior_daily_state partition (pt_day='%s') /            select a1.appsource,a1.appkey,a1.uid,a1.remain_flag,'Gifts' behavior_type,case when a2.uid is not null then 1 else 0 end behavior_flag /            from (select * from bi_register_clustering_user_detail where pt_day='%s') a1 /            left join (select fromuid uid from bi_all_jellyfish_log where logtype=1 and pt_day='%s' group by fromuid) a2 on a1.uid=a2.uid;" /            """ % (run_day, run_day, run_day))def behavior_GameZoneAccess(run_day):    os.system("""source /etc/profile; /            /usr/lib/hive-current/bin/hive -e " /            insert into table bi_clustering_behavior_daily_state partition (pt_day='%s') /            select a1.appsource,a1.appkey,a1.uid,a1.remain_flag,'GameZoneAccess' behavior_type,case when a2.uid is not null then 1 else 0 end behavior_flag /            from (select * from bi_register_clustering_user_detail where pt_day='%s') a1 /            left join (select uid from bi_all_jellyfish_log where logtype=5 and pt_day='%s' group by uid) a2 on a1.uid=a2.uid;" /            """ % (run_day, run_day, run_day))def behavior_VideoPlayAccess(run_day):    os.system("""source /etc/profile; /            /usr/lib/hive-current/bin/hive -e " /            insert into table bi_clustering_behavior_daily_state partition (pt_day='%s') /            select a1.appsource,a1.appkey,a1.uid,a1.remain_flag,'VideoPlayAccess' behavior_type,case when a2.uid is not null then 1 else 0 end behavior_flag /            from (select * from bi_register_clustering_user_detail where pt_day='%s') a1 /            left join (select uid from bi_all_jellyfish_log where logtype=6 and pt_day='%s' group by uid) a2 on a1.uid=a2.uid;" /            """ % (run_day, run_day, run_day))def behavior_WatchLongerThan5Minutes(run_day):    os.system("""source /etc/profile; /            /usr/lib/hive-current/bin/hive -e " /            insert into table bi_clustering_behavior_daily_state partition (pt_day='%s') /            select a1.appsource,a1.appkey,a1.uid,a1.remain_flag,'WatchLongerThan5Minutes' behavior_type,case when a2.uid is not null then 1 else 0 end behavior_flag /            from (select * from bi_register_clustering_user_detail where pt_day='%s') a1 /            left join (select uid from data_chushou_daily_heartbeat_stat_v2 where pt_day='%s' group by uid having count(*)>=5) a2 on a1.uid=a2.uid;" /            """ % (run_day, run_day, run_day))def behavior_LiveProgramStatus(run_day):    os.system("""source /etc/profile; /            /usr/lib/hive-current/bin/hive -e " /            insert into table bi_clustering_behavior_daily_state partition (pt_day='%s') /            select a1.appsource,a1.appkey,a1.uid,a1.remain_flag,'LiveProgramStatus' behavior_type,case when a2.uid is not null then 1 else 0 end behavior_flag /            from (select * from bi_register_clustering_user_detail where pt_day='%s') a1 /            left join (select x2.uid from (select room_id from data_chushou_live_history_status where pt_day='%s' group by room_id) x1 inner join (select id room_id,creator_uid uid from data_chushou_live_uid_status where state=0) x2 on x1.room_id=x2.room_id group by x2.uid) a2 on a1.uid=a2.uid;" /            """ % (run_day, run_day, run_day))def behavior_batchCtl(run_day):    behaviorDailyTab_partitionAdd(run_day)  # 分区的添加    behavior_MessageSend(run_day)  #用户行为数据的插入    behavior_PhoneBinding(run_day)    behavior_RoomSubscribe(run_day)    behavior_Pay(run_day)    behavior_Gifts(run_day)    behavior_GameZoneAccess(run_day)    behavior_VideoPlayAccess(run_day)    behavior_WatchLongerThan5Minutes(run_day)    behavior_LiveProgramStatus(run_day)def getBehaviorDayList():    dayMinMax_data = os.popen ("""source /etc/profile; /            /usr/lib/hive-current/bin/hive -e " /            select min(pt_day),max(pt_day) from bi_register_clustering_user_detail;" /            """).readlines();    DayMM_list = []    for dm_list in dayMinMax_data:        dm = re.split('/t', dm_list.replace('/n', ''))        DayMM_list.append(dm)    for daym in DayMM_list:        beginDate = daym[0]        endDate = daym[1]    return dateRange(beginDate, endDate)# Batch Test# run_day = '2017-01-02'# behavior_batchCtl(run_day)# for run_day in getBehaviorDayList():#     print run_day并行调度:/Users/nisj/PycharmProjects/BiDataProc/UserBehavior/BatchThread_Behavior.py
# -*- coding=utf-8 -*-import threadpoolimport timefrom Hive_ClusteringBehavior_proc import *warnings.filterwarnings("ignore")today = datetime.date.today()yesterday = today - datetime.timedelta(days=1)tomorrow = today + datetime.timedelta(days=1)now_time = time.strftime('%Y-%m-%d %X', time.localtime())print "当前时间是:",now_time# 计算2017年的数据信息batch_runDay_list = getBehaviorDayList()requests = []request_behavior_batchCtl = threadpool.makeRequests(behavior_batchCtl, batch_runDay_list)requests.extend(request_behavior_batchCtl)main_pool = threadpool.ThreadPool(10)[main_pool.putRequest(req) for req in requests]if __name__ == '__main__':    while True:        try:            time.sleep(30)            main_pool.poll()        except KeyboardInterrupt:            print("**** Interrupted!")            break        except threadpool.NoResultsPending:            break    if main_pool.dismissedWorkers:        print("Joining all dismissed worker threads...")        main_pool.joinAllDismissedWorkers()now_time = time.strftime('%Y-%m-%d %X', time.localtime())print "当前时间是:",now_time5、用户行为结果表进行行列转换并插入到Mysql/Users/nisj/PycharmProjects/BiDataProc/UserBehavior/Hive_ClusteringBehavior_line2col.py
# -*- coding=utf-8 -*-import osimport reimport timefrom DateProc import *warnings.filterwarnings("ignore")def behaviorLine2Col_intoMysql():    os.system("""/usr/bin/mysql -hMysqlHost -PMysqlPort -uMysqlUser -pMysqlPass -e "use funnyai_data; /                truncate table bi_clustering_behavior_daily_state; /                " """ )    behaviorData = os.popen("""source /etc/profile; /            /usr/lib/hive-current/bin/hive -e " /            select appsource,appkey,uid,remain_flag,pt_day, /            max(case when behavior_type='MessageSend' then behavior_flag else null end) MessageSend, /            max(case when behavior_type='PhoneBinding' then behavior_flag else null end) PhoneBinding, /            max(case when behavior_type='RoomSubscribe' then behavior_flag else null end) RoomSubscribe, /            max(case when behavior_type='Gifts' then behavior_flag else null end) Gifts, /            max(case when behavior_type='Pay' then behavior_flag else null end) Pay, /            max(case when behavior_type='VideoPlayAccess' then behavior_flag else null end) VideoPlayAccess, /            max(case when behavior_type='GameZoneAccess' then behavior_flag else null end) GameZoneAccess, /            max(case when behavior_type='LiveProgramStatus' then behavior_flag else null end) LiveProgramStatus, /            max(case when behavior_type='WatchLongerThan5Minutes' then behavior_flag else null end) WatchLongerThan5Minutes /            from bi_clustering_behavior_daily_state /            where pt_day is not null /            group by appsource,appkey,uid,remain_flag,pt_day /            ;" /            """ ).readlines();    behavior_list = []    for bhL in behaviorData:        bh = re.split('/t', bhL.replace('/n', ''))        behavior_list.append(bh)    i = 0    insert_mysql_sql = """/usr/bin/mysql -hMysqlHost -PMysqlPort -uMysqlUser -pMysqlPass -e "use funnyai_data; /                insert into bi_clustering_behavior_daily_state(appsource, appkey, uid, remain_flag, pt_day, MessageSend, PhoneBinding, RoomSubscribe, Gifts, Pay, VideoPlayAccess, GameZoneAccess, LiveProgramStatus, WatchLongerThan5Minutes, etl_time) /                values """    for behavior_line in behavior_list:        appsource = behavior_line[0]        appkey = behavior_line[1]        uid = behavior_line[2]        remain_flag = behavior_line[3]        pt_day = behavior_line[4]        MessageSend = behavior_line[5]        PhoneBinding = behavior_line[6]        RoomSubscribe = behavior_line[7]        Gifts = behavior_line[8]        Pay = behavior_line[9]        VideoPlayAccess = behavior_line[10]        GameZoneAccess = behavior_line[11]        LiveProgramStatus = behavior_line[12]        WatchLongerThan5Minutes = behavior_line[13]        etl_time = time.strftime('%Y-%m-%d %X', time.localtime())        i += 1        insert_mysql_sql = insert_mysql_sql + """('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s'),""" % (appsource, appkey, uid, remain_flag, pt_day, MessageSend, PhoneBinding, RoomSubscribe, Gifts, Pay, VideoPlayAccess, GameZoneAccess, LiveProgramStatus, WatchLongerThan5Minutes, etl_time)        if (i % 1000 == 0):            insert_mysql_sql = insert_mysql_sql.rstrip(',') + """ ;" """            os.system(insert_mysql_sql)            insert_mysql_sql = """/usr/bin/mysql -hMysqlHost -PMysqlPort -uMysqlUser -pMysqlPass -e "use funnyai_data; /                            insert into bi_clustering_behavior_daily_state(appsource, appkey, uid, remain_flag, pt_day, MessageSend, PhoneBinding, RoomSubscribe, Gifts, Pay, VideoPlayAccess, GameZoneAccess, LiveProgramStatus, WatchLongerThan5Minutes, etl_time) /                            values """    insert_mysql_sql = insert_mysql_sql.rstrip(',') + """ ;" """    os.system(insert_mysql_sql)    # print insert_mysql_sqldef getBehaviorDayList():    dayMinMax_data = os.popen ("""source /etc/profile; /            /usr/lib/hive-current/bin/hive -e " /            select min(pt_day),max(pt_day) from bi_clustering_behavior_daily_state;" /            """).readlines();    DayMM_list = []    for dm_list in dayMinMax_data:        dm = re.split('/t', dm_list.replace('/n', ''))        DayMM_list.append(dm)    for daym in DayMM_list:        beginDate = daym[0]        endDate = daym[1]    return dateRange(beginDate, endDate)# Batch Test# for run_day in getBehaviorDayList():#     print run_daybehaviorLine2Col_intoMysql()注意Mysql数据批量插入的方法使用!
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表