package com.disneystreaming.groupwatch.edge.internal.d;

import com.bamtech.core.logging.LogDispatcher;
import com.disneystreaming.groupwatch.edge.internal.EdgeToClientEvent;
import com.disneystreaming.groupwatch.m0;
import com.disneystreaming.groupwatch.n0;
import com.dss.sdk.Session;
import com.dss.sdk.session.EventEmitterKt;
import com.dss.sdk.sockets.SocketApi;
import com.dss.sdk.sockets.SocketEvent;
import com.google.common.base.Optional;
import com.squareup.moshi.s;
import io.reactivex.Completable;
import io.reactivex.Observable;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import java.lang.reflect.ParameterizedType;
import java.util.ArrayList;
import java.util.Map;
import kotlin.collections.g0;
import kotlin.jvm.internal.h;
import kotlin.k;
import kotlin.reflect.KClass;

/* compiled from: DefaultSocketManager.kt */
/* loaded from: classes2.dex */
public final class c implements com.disneystreaming.groupwatch.edge.internal.c {
    private final LogDispatcher a;
    private final SocketApi b;
    private final Map<String, KClass<? extends EdgeToClientEvent>> c;
    private final Observable<EdgeToClientEvent> d;

    /* compiled from: RxExt.kt */
    /* loaded from: classes2.dex */
    public static final class a<T, R> implements Function {
        /* JADX WARN: Multi-variable type inference failed */
        @Override // io.reactivex.functions.Function
        /* renamed from: a, reason: merged with bridge method [inline-methods] */
        public final Optional<R> apply(T it) {
            h.g(it, "it");
            return Optional.b((EdgeToClientEvent) ((SocketEvent) it).getData());
        }
    }

    public c(Session session, LogDispatcher logger) {
        Map<String, KClass<? extends EdgeToClientEvent>> l2;
        h.g(session, "session");
        h.g(logger, "logger");
        this.a = logger;
        this.b = session.getSocketApi();
        l2 = g0.l(k.a("urn:dss:event:groupWatch:coreServices:group:created", kotlin.jvm.internal.k.b(EdgeToClientEvent.Created.class)), k.a("urn:dss:event:groupWatch:coreServices:group:createErrored", kotlin.jvm.internal.k.b(EdgeToClientEvent.GroupCreateErrored.class)), k.a("urn:dss:event:groupWatch:coreServices:reactions:reactionMulticasted", kotlin.jvm.internal.k.b(EdgeToClientEvent.ReactionMulticasted.class)), k.a("urn:dss:event:groupWatch:coreServices:group:joined", kotlin.jvm.internal.k.b(EdgeToClientEvent.Joined.class)), k.a("urn:dss:event:groupWatch:coreServices:group:joinErrored", kotlin.jvm.internal.k.b(EdgeToClientEvent.JoinErrored.class)), k.a("urn:dss:event:groupWatch:coreServices:group:profileJoined", kotlin.jvm.internal.k.b(EdgeToClientEvent.ProfileJoined.class)), k.a("urn:dss:event:groupWatch:coreServices:group:deviceJoined", kotlin.jvm.internal.k.b(EdgeToClientEvent.DeviceJoined.class)), k.a("urn:dss:event:groupWatch:coreServices:group:profileLeft", kotlin.jvm.internal.k.b(EdgeToClientEvent.ProfileLeft.class)), k.a("urn:dss:event:groupWatch:coreServices:group:deviceLeft", kotlin.jvm.internal.k.b(EdgeToClientEvent.DeviceLeft.class)), k.a("urn:dss:event:groupWatch:coreServices:group:profileLeaveErrored", kotlin.jvm.internal.k.b(EdgeToClientEvent.ProfileLeaveErrored.class)), k.a("urn:dss:event:groupWatch:coreServices:group:deviceLeaveErrored", kotlin.jvm.internal.k.b(EdgeToClientEvent.DeviceLeaveErrored.class)), k.a("urn:dss:event:groupWatch:coreServices:group:groupStateAcknowledged", kotlin.jvm.internal.k.b(EdgeToClientEvent.GroupStateAcknowledged.class)), k.a("urn:dss:event:groupWatch:coreServices:group:groupStateErrored", kotlin.jvm.internal.k.b(EdgeToClientEvent.GroupStateErrored.class)), k.a("urn:dss:event:groupWatch:coreServices:playhead:createErrored", kotlin.jvm.internal.k.b(EdgeToClientEvent.PlayheadCreateErrored.class)), k.a("urn:dss:event:groupWatch:coreServices:playhead:playheadUpdated", kotlin.jvm.internal.k.b(EdgeToClientEvent.PlayheadUpdated.class)), k.a("urn:dss:event:groupWatch:coreServices:latencyCheck:latencyCheckAcknowledged", kotlin.jvm.internal.k.b(EdgeToClientEvent.LatencyCheckAcknowledged.class)));
        this.c = l2;
        ArrayList arrayList = new ArrayList(l2.size());
        for (Map.Entry<String, KClass<? extends EdgeToClientEvent>> entry : l2.entrySet()) {
            arrayList.add(EventEmitterKt.getObservable(this.b.onMessageReceived(entry.getKey(), kotlin.jvm.a.b(entry.getValue()))));
        }
        Observable t0 = Observable.t0(arrayList);
        h.f(t0, "merge(\n            eventUrnTypeMap.map { (urn, type) ->\n                sdkSocketApi.onMessageReceived<EdgeToClientEvent>(urn, type.java).getObservable()\n            }\n        )");
        Observable<EdgeToClientEvent> r0 = t0.r0(new a()).S(m0.a).r0(n0.a);
        h.f(r0, "crossinline mapperFunction: (T) -> R?\n): Observable<R> {\n    return map { Optional.fromNullable(mapperFunction.invoke(it)) }\n        .filter { it.isPresent }\n        .map { it.get() }");
        this.d = r0;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final void e(c this$0, SocketEvent eventToSend) {
        h.g(this$0, "this$0");
        h.g(eventToSend, "$eventToSend");
        LogDispatcher.DefaultImpls.d$default(this$0.a, this$0, "Message Sent", eventToSend, false, 8, null);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final void f(c this$0, Throwable th) {
        h.g(this$0, "this$0");
        LogDispatcher.DefaultImpls.e$default(this$0.a, this$0, "sendMessage", th.getMessage(), false, 8, null);
    }

    @Override // com.disneystreaming.groupwatch.edge.internal.c
    public Completable a(com.disneystreaming.groupwatch.edge.internal.a<?> event) {
        h.g(event, "event");
        final SocketEvent<?> a2 = com.disneystreaming.groupwatch.edge.internal.b.a(event);
        SocketApi socketApi = this.b;
        a2.setSubject("sessionId={sdkSessionIdSubject}{profileIdSubject}");
        ParameterizedType type = s.k(SocketEvent.class, Object.class);
        h.f(type, "type");
        Completable z = socketApi.sendMessage(a2, type).x(new io.reactivex.functions.a() { // from class: com.disneystreaming.groupwatch.edge.internal.d.b
            @Override // io.reactivex.functions.a
            public final void run() {
                c.e(c.this, a2);
            }
        }).z(new Consumer() { // from class: com.disneystreaming.groupwatch.edge.internal.d.a
            @Override // io.reactivex.functions.Consumer
            public final void accept(Object obj) {
                c.f(c.this, (Throwable) obj);
            }
        });
        h.f(z, "sdkSocketApi.sendMessage(eventToSend)\n            .doOnComplete { logger.d(this, \"Message Sent\", eventToSend) }\n            .doOnError { logger.e(this, \"sendMessage\", it.message) }");
        return z;
    }

    @Override // com.disneystreaming.groupwatch.edge.internal.c
    public Observable<EdgeToClientEvent> b() {
        return this.d;
    }
}
